zoukankan      html  css  js  c++  java
  • Dubbo源码解析(七)之consumer初始化

    dubbo的consumer由ReferenceBean初始化,我们先来看一下这个类的层次结构:

     我们看到ReferenceBean实现了InitializingBean,所以我们先来看一下它的afterPropertiesSet方法实现:
    ReferenceBean:

    public void afterPropertiesSet() throws Exception {
    // ConsumerConfig配置
    if (getConsumer() == null) {
    Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);
    if (consumerConfigMap != null && consumerConfigMap.size() > 0) {
    ConsumerConfig consumerConfig = null;
    for (ConsumerConfig config : consumerConfigMap.values()) {
    if (config.isDefault() == null || config.isDefault().booleanValue()) {
    if (consumerConfig != null) {
    throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);
    }
    consumerConfig = config;
    }
    }
    if (consumerConfig != null) {
    setConsumer(consumerConfig);
    }
    }
    }
    // ApplicationConfig配置
    if (getApplication() == null
    && (getConsumer() == null || getConsumer().getApplication() == null)) {
    Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
    if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
    ApplicationConfig applicationConfig = null;
    for (ApplicationConfig config : applicationConfigMap.values()) {
    if (config.isDefault() == null || config.isDefault().booleanValue()) {
    if (applicationConfig != null) {
    throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
    }
    applicationConfig = config;
    }
    }
    if (applicationConfig != null) {
    setApplication(applicationConfig);
    }
    }
    }
    // ModuleConfig配置
    if (getModule() == null
    && (getConsumer() == null || getConsumer().getModule() == null)) {
    Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
    if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
    ModuleConfig moduleConfig = null;
    for (ModuleConfig config : moduleConfigMap.values()) {
    if (config.isDefault() == null || config.isDefault().booleanValue()) {
    if (moduleConfig != null) {
    throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
    }
    moduleConfig = config;
    }
    }
    if (moduleConfig != null) {
    setModule(moduleConfig);
    }
    }
    }
    // RegistryConfig配置
    if ((getRegistries() == null || getRegistries().size() == 0)
    && (getConsumer() == null || getConsumer().getRegistries() == null || getConsumer().getRegistries().size() == 0)
    && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().size() == 0)) {
    Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
    if (registryConfigMap != null && registryConfigMap.size() > 0) {
    List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
    for (RegistryConfig config : registryConfigMap.values()) {
    if (config.isDefault() == null || config.isDefault().booleanValue()) {
    registryConfigs.add(config);
    }
    }
    if (registryConfigs != null && registryConfigs.size() > 0) {
    super.setRegistries(registryConfigs);
    }
    }
    }
    // MonitorConfig配置
    if (getMonitor() == null
    && (getConsumer() == null || getConsumer().getMonitor() == null)
    && (getApplication() == null || getApplication().getMonitor() == null)) {
    Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
    if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
    MonitorConfig monitorConfig = null;
    for (MonitorConfig config : monitorConfigMap.values()) {
    if (config.isDefault() == null || config.isDefault().booleanValue()) {
    if (monitorConfig != null) {
    throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
    }
    monitorConfig = config;
    }
    }
    if (monitorConfig != null) {
    setMonitor(monitorConfig);
    }
    }
    }
    Boolean b = isInit();
    if (b == null && getConsumer() != null) {
    b = getConsumer().isInit();
    }
    if (b != null && b.booleanValue()) {
    /* 非延迟初始化直接进行初始化 */
    getObject();
    }
    }
    ReferenceBean:
    public Object getObject() throws Exception { return get(); }
    ReferenceConfig:
    public synchronized T get() {
    if (destroyed) {
    throw new IllegalStateException("Already destroyed!");
    }
    if (ref == null) {
    init(); /* 初始化 */
    }
    return ref;
    }
    ReferenceConfig:
    private void init() {
    if (initialized) {
    return;
    }
    initialized = true;
    if (interfaceName == null || interfaceName.length() == 0) {
    throw new IllegalStateException("<dubbo:reference interface="" /> interface not allow null!");
    }
    // 获取consumer全局配置
    checkDefault();
    // 追加properties配置,我们在provider初始化的文章中分析过,复用的是相同的方法
    appendProperties(this);
    if (getGeneric() == null && getConsumer() != null) {
    setGeneric(getConsumer().getGeneric());
    }
    if (ProtocolUtils.isGeneric(getGeneric())) {
    // 泛化服务
    interfaceClass = GenericService.class;
    } else {
    try {
    interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
    .getContextClassLoader());
    } catch (ClassNotFoundException e) {
    throw new IllegalStateException(e.getMessage(), e);
    }
    // 检查接口和方法,方法在接口中是否存在
    checkInterfaceAndMethods(interfaceClass, methods);
    }
    // 解析文件的应用
    String resolve = System.getProperty(interfaceName);
    String resolveFile = null;
    if (resolve == null || resolve.length() == 0) {
    resolveFile = System.getProperty("dubbo.resolve.file");
    if (resolveFile == null || resolveFile.length() == 0) {
    File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
    if (userResolveFile.exists()) {
    resolveFile = userResolveFile.getAbsolutePath();
    }
    }
    if (resolveFile != null && resolveFile.length() > 0) {
    Properties properties = new Properties();
    FileInputStream fis = null;
    try {
    fis = new FileInputStream(new File(resolveFile));
    properties.load(fis);
    } catch (IOException e) {
    throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
    } finally {
    try {
    if (null != fis) fis.close();
    } catch (IOException e) {
    logger.warn(e.getMessage(), e);
    }
    }
    resolve = properties.getProperty(interfaceName);
    }
    }
    if (resolve != null && resolve.length() > 0) {
    url = resolve;
    if (logger.isWarnEnabled()) {
    if (resolveFile != null && resolveFile.length() > 0) {
    logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
    } else {
    logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
    }
    }
    }
    // 最终确定各个配置对象
    if (consumer != null) {
    if (application == null) {
    application = consumer.getApplication();
    }
    if (module == null) {
    module = consumer.getModule();
    }
    if (registries == null) {
    registries = consumer.getRegistries();
    }
    if (monitor == null) {
    monitor = consumer.getMonitor();
    }
    }
    if (module != null) {
    if (registries == null) {
    registries = module.getRegistries();
    }
    if (monitor == null) {
    monitor = module.getMonitor();
    }
    }
    if (application != null) {
    if (registries == null) {
    registries = application.getRegistries();
    }
    if (monitor == null) {
    monitor = application.getMonitor();
    }
    }
    // 检查application配置,分析provider初始化时分析过
    checkApplication();
    // 检查stub、mock配置,分析provider初始化时分析过
    checkStubAndMock(interfaceClass);
    Map<String, String> map = new HashMap<String, String>();
    // 属性map
    Map<Object, Object> attributes = new HashMap<Object, Object>();
    map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
    map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    if (ConfigUtils.getPid() > 0) {
    map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    }
    if (!isGeneric()) {
    // 校对接口版本
    String revision = Version.getVersion(interfaceClass, version);
    if (revision != null && revision.length() > 0) {
    map.put("revision", revision);
    }
    // 方法列表
    String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
    if (methods.length == 0) {
    logger.warn("NO method found in service interface " + interfaceClass.getName());
    map.put("methods", Constants.ANY_VALUE);
    } else {
    map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
    }
    }
    map.put(Constants.INTERFACE_KEY, interfaceName);
    // 为一些配置对象追加参数,分析provider初始化时分析过
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, consumer, Constants.DEFAULT_KEY);
    appendParameters(map, this);
    String prifix = StringUtils.getServiceKey(map);
    if (methods != null && methods.size() > 0) {
    for (MethodConfig method : methods) {
    appendParameters(map, method, method.getName());
    String retryKey = method.getName() + ".retry";
    if (map.containsKey(retryKey)) {
    String retryValue = map.remove(retryKey);
    if ("false".equals(retryValue)) {
    // retry配置为false默认重试0次
    map.put(method.getName() + ".retries", "0");
    }
    }
    appendAttributes(attributes, method, prifix + "." + method.getName());
    // 检查配置冲突,将onreturn、onthrow、oninvoke方法名转换为方法
    checkAndConvertImplicitConfig(method, map, attributes);
    }
    }
    // 确定用于注册的ip
    String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
    if (hostToRegistry == null || hostToRegistry.length() == 0) {
    hostToRegistry = NetUtils.getLocalHost();
    } else if (isInvalidLocalHost(hostToRegistry)) {
    throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
    }
    map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
    // 属性保存到系统上下文中
    StaticContext.getSystemContext().putAll(attributes);
    /* 创建代理 */
    ref = createProxy(map);
    // 获取唯一服务名称(由group分组、接口名称和版本拼接而成),创建consumer模型
    ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
    // 添加唯一服务名称和consumer模型的映射
    ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    }}
    这里有一些方法我们在分析provider初始化源码的时候分析过,这里就不再重复了,复用的相同的流程。
    ReferenceConfig:
    private T createProxy(Map<String, String> map) {
    URL tmpUrl = new URL("temp", "localhost", 0, map);
    final boolean isJvmRefer;
    if (isInjvm() == null) {
    // 如果指定了URL,不要做本地引用
    if (url != null && url.length() > 0) {
    isJvmRefer = false;
    } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
    // 默认情况下,引用本地服务,如果存在
    isJvmRefer = true;
    } else {
    isJvmRefer = false;
    }
    } else {
    isJvmRefer = isInjvm().booleanValue();
    }
    if (isJvmRefer) {
    URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
    // InjvmProtocol做本地引用
    invoker = refprotocol.refer(interfaceClass, url);
    if (logger.isInfoEnabled()) {
    logger.info("Using injvm service " + interfaceClass.getName());
    }
    } else {
    // 用户指定的URL,可以是点对点地址,也可以是注册中心的地址
    if (url != null && url.length() > 0) {
    String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
    if (us != null && us.length > 0) {
    for (String u : us) {
    URL url = URL.valueOf(u);
    if (url.getPath() == null || url.getPath().length() == 0) {
    url = url.setPath(interfaceName);
    }
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
    } else {
    urls.add(ClusterUtils.mergeUrl(url, map));
    }
    }
    }
    // 从注册中心的配置组合URL
    } else {
    // 加载注册中心配置,provider初始化文章中分析过
    List<URL> us = loadRegistries(false);
    if (us != null && us.size() > 0) {
    for (URL u : us) {
    // 加载监控中心地址
    URL monitorUrl = loadMonitor(u);
    if (monitorUrl != null) {
    map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
    }
    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
    }
    }
    if (urls == null || urls.size() == 0) {
    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address="..." /> to your spring config.");
    }
    }
    if (urls.size() == 1) {
    /* 单个url直接关联引用 */
    invoker = refprotocol.refer(interfaceClass, urls.get(0));
    } else {
    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
    URL registryURL = null;
    for (URL url : urls) {
    /* 多个url循环关联引用返回invoker */
    invokers.add(refprotocol.refer(interfaceClass, url));
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
    registryURL = url; // 使用最后一个注册中心url
    }
    }
    if (registryURL != null) {
    // 仅在register的群集可用时才使用AvailableCluster
    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
    // AvailableCluster
    invoker = cluster.join(new StaticDirectory(u, invokers));
    } else {
    // 不是一个注册中心url
    invoker = cluster.join(new StaticDirectory(invokers));
    }
    }
    }
    Boolean c = check;
    if (c == null && consumer != null) {
    c = consumer.isCheck();
    }
    if (c == null) {
    c = true; // 默认要检查关联服务的provider是否存在
    }
    if (c && !invoker.isAvailable()) {
    throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
    }
    if (logger.isInfoEnabled()) {
    logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
    }
    // javaassist生成动态代理类
    return (T) proxyFactory.getProxy(invoker);
    }}
    RegistryProtocol:
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    // 获取注册中心对象
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
    return proxyFactory.getInvoker((T) registry, type, url);
    }
    // group="a,b" or group="*"
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    String group = qs.get(Constants.GROUP_KEY);
    if (group != null && group.length() > 0) {
    // 多个分组或者是通配符分组的处理
    if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
    || "*".equals(group)) {
    /* 关联服务引用 */
    return doRefer(getMergeableCluster(), registry, type, url);
    }
    }
    /* 关联服务引用 */
    return doRefer(cluster, registry, type, url);
    }
    关于注册中心的相关操作,我们在Dubbo源码解析之registry注册中心这篇文章中已经分析过,这里不再赘述。
    RegistryProtocol:
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
    && url.getParameter(Constants.REGISTER_KEY, true)) {
    // 注册中心注册成为消费者
    registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
    Constants.CHECK_KEY, String.valueOf(false)));
    }
    // 订阅注册中心中服务的提供者、配置、和路由等相关信息
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
    Constants.PROVIDERS_CATEGORY
    + "," + Constants.CONFIGURATORS_CATEGORY
    + "," + Constants.ROUTERS_CATEGORY));
    // 多分组为MergeableCluster,单分组默认为FailoverCluster
    // 分别返回的Invoker就是MergeableClusterInvoker和FailoverClusterInvoker
    Invoker invoker = cluster.join(directory);
    /* 注册consumer */
    ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
    return invoker;
    }
    ProviderConsumerRegTable:
    public static void registerConsuemr(Invoker invoker, URL registryUrl, URL consumerUrl, RegistryDirectory registryDirectory) {
    // 构建包装类
    ConsumerInvokerWrapper wrapperInvoker = new ConsumerInvokerWrapper(invoker, registryUrl, consumerUrl, registryDirectory);
    String serviceUniqueName = consumerUrl.getServiceKey();
    // 从服务唯一名称和invoker集合的映射中获取invoker集合
    Set<ConsumerInvokerWrapper> invokers = consumerInvokers.get(serviceUniqueName);
    if (invokers == null) {
    // 获取不到添加新的集合
    consumerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet<ConsumerInvokerWrapper>());
    invokers = consumerInvokers.get(serviceUniqueName);
    }
    // 包装类放入集合
    invokers.add(wrapperInvoker);
    }
    到这里,整个consumer初始化流程就分析完成了。
    
    
  • 相关阅读:
    【leetcode】Binary Search Tree Iterator
    【leetcode】Palindrome Partitioning II
    【leetcode】Best Time to Buy and Sell Stock III
    【leetcode】Best Time to Buy and Sell Stock II
    【leetcode】Longest Consecutive Sequence
    【leetcode】Factorial Trailing Zeroes
    【leetcode】Simplify Path
    【leetcode】Generate Parentheses
    【leetcode】Combination Sum II
    【leetcode】Combination Sum
  • 原文地址:https://www.cnblogs.com/lanblogs/p/15261906.html
Copyright © 2011-2022 走看看