zoukankan      html  css  js  c++  java
  • 关于flume配置加载

    最近项目在用到flume,因此翻了下flume的代码,

    启动脚本:

      nohup bin/flume-ng agent -n tsdbflume -c conf -f conf/配置文件.conf -Dflume.root.logger=DEBUG,console &

    翻下flume-ng的脚本,  FLUME_AGENT_CLASS="org.apache.flume.node.Application"

    从Application进入;

    写了下flume-agent启动的时序图:

    这是简单的启动时序图,时序图里画的都在flume-node这个项目里;

    里边其实最核心的是创建配置文件里的对象,这个工作是在AbstractConfigurationProvider.getConfiguration()这个方法里做的。先上代码

      public MaterializedConfiguration getConfiguration() {
        MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
       // getFlumeConfiguration()方法,是关键核心,负责整个配置加载,下边代码说明 FlumeConfiguration fconfig
    = getFlumeConfiguration(); AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName()); if (agentConf != null) { Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap(); Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap(); Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap(); try { loadChannels(agentConf, channelComponentMap); loadSources(agentConf, channelComponentMap, sourceRunnerMap); loadSinks(agentConf, channelComponentMap, sinkRunnerMap); Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet()); for (String channelName : channelNames) { ChannelComponent channelComponent = channelComponentMap.get(channelName); if (channelComponent.components.isEmpty()) { LOGGER.warn(String.format("Channel %s has no components connected" + " and has been removed.", channelName)); channelComponentMap.remove(channelName); Map<String, Channel> nameChannelMap = channelCache.get(channelComponent.channel.getClass()); if (nameChannelMap != null) { nameChannelMap.remove(channelName); } } else { LOGGER.info(String.format("Channel %s connected to %s", channelName, channelComponent.components.toString())); conf.addChannel(channelName, channelComponent.channel); } } for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); } for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); } finally { channelComponentMap.clear(); sourceRunnerMap.clear(); sinkRunnerMap.clear(); } } else { LOGGER.warn("No configuration found for this host:{}", getAgentName()); } return conf; }

    下面说下这个类的具体加载过程。

    PropertiesFileConfigurationProvider.getFlumeConfiguration()

      @Override
      public FlumeConfiguration getFlumeConfiguration() {
        BufferedReader reader = null;
        try {
          reader = new BufferedReader(new FileReader(file));
          Properties properties = new Properties();
          properties.load(reader);
          return new FlumeConfiguration(toMap(properties));
        } catch (IOException ex) {
          LOGGER.error("Unable to load file:" + file
              + " (I/O failure) - Exception follows.", ex);
        } finally {
          if (reader != null) {
            try {
              reader.close();
            } catch (IOException ex) {
              LOGGER.warn(
                  "Unable to close file reader for file: " + file, ex);
            }
          }
        }
        return new FlumeConfiguration(new HashMap<String, String>());
      }

    new FlumeConfiguration(toMap(properties)),代码在下边:

    下边从代码入手写下怎么加载内容:

      /**
       * 创建一个FlumeConfiguration对象,参数为:配置文件的key-value对
       * Creates a populated Flume Configuration object.
       */
      public FlumeConfiguration(Map<String, String> properties) {
        agentConfigMap = new HashMap<String, AgentConfiguration>();
        errors = new LinkedList<FlumeConfigurationError>();
        // Construct the in-memory component hierarchy
        for (String name : properties.keySet()) {
          String value = properties.get(name);
    
          // addRawProperty里对agentConfigMap初始化,  
    // 1:这里插入的是agentConfiguration对象里的contextMap
    if (!addRawProperty(name, value)) { logger.warn("Configuration property ignored: " + name + " = " + value); } } // Now iterate thru the agentContext and create agent configs and add them // to agentConfigMap // validate and remove improperly configured components
    // 2:这里插入的是agentConfiguration对象里的configMap
    validateConfiguration(); }

    然后,进入FlumeConfiguration.addRawProperty(name,value):

      private boolean addRawProperty(String name, String value) {
        // Null names and values not supported
        if (name == null || value == null) {
          errors
              .add(new FlumeConfigurationError("", "",
                  FlumeConfigurationErrorType.AGENT_NAME_MISSING,
                  ErrorOrWarning.ERROR));
          return false;
        }
    
        // Empty values are not supported
        if (value.trim().length() == 0) {
          errors
              .add(new FlumeConfigurationError(name, "",
                  FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
                  ErrorOrWarning.ERROR));
          return false;
        }
    
        // Remove leading and trailing spaces
        name = name.trim();
        value = value.trim();
    
        int index = name.indexOf('.');
    
        // All configuration keys must have a prefix defined as agent name
        if (index == -1) {
          errors
              .add(new FlumeConfigurationError(name, "",
                  FlumeConfigurationErrorType.AGENT_NAME_MISSING,
                  ErrorOrWarning.ERROR));
          return false;
        }
    
        String agentName = name.substring(0, index);
    
        // Agent name must be specified for all properties
        if (agentName.length() == 0) {
          errors
              .add(new FlumeConfigurationError(name, "",
                  FlumeConfigurationErrorType.AGENT_NAME_MISSING,
                  ErrorOrWarning.ERROR));
          return false;
        }
    
        String configKey = name.substring(index + 1);
    
        // Configuration key must be specified for every property
        if (configKey.length() == 0) {
          errors
              .add(new FlumeConfigurationError(name, "",
                  FlumeConfigurationErrorType.PROPERTY_NAME_NULL,
                  ErrorOrWarning.ERROR));
          return false;
        }
    
        AgentConfiguration aconf = agentConfigMap.get(agentName);
    
        // 这里创建AgentConfiguration,并插入到FlumeConfiguration的Map<String, AgentConfiguration> agentConfigMap中
        if (aconf == null) {
          aconf = new AgentConfiguration(agentName, errors);
          agentConfigMap.put(agentName, aconf);
        }
    
        // Each configuration key must begin with one of the three prefixes:
        // sources, sinks, or channels.
    // 最终,键值对被加载到agentConfiguration中
    return aconf.addProperty(configKey, value); }

    最终被加载到:agentConfiguration.addProperty(String key, String value)中

          ComponentNameAndConfigKey cnck = parseConfigKey(key,
              BasicConfigurationConstants.CONFIG_SOURCES_PREFIX);
    
          if (cnck != null) {
            // it is a source
            String name = cnck.getComponentName();
            Context srcConf = sourceContextMap.get(name);
    
            if (srcConf == null) {
              srcConf = new Context();
              sourceContextMap.put(name, srcConf);
            }
            // sourceContextMap中存放new Context().put(cnck.getConifyKey(),value)
            srcConf.put(cnck.getConfigKey(), value);
            return true;
          }

    所以最后是加载到AgentConfiguration中对应的*ContextMap中了;

      public static class AgentConfiguration {
    
        private final String agentName;
        private String sources;
        private String sinks;
        private String channels;
        private String sinkgroups;
    
        private final Map<String, ComponentConfiguration> sourceConfigMap;
        private final Map<String, ComponentConfiguration> sinkConfigMap;
        private final Map<String, ComponentConfiguration> channelConfigMap;
        private final Map<String, ComponentConfiguration> sinkgroupConfigMap;
    
        private Map<String, Context> sourceContextMap;
        private Map<String, Context> sinkContextMap;
        private Map<String, Context> channelContextMap;
        private Map<String, Context> sinkGroupContextMap;
    
        private Set<String> sinkSet;
        private Set<String> sourceSet;
        private Set<String> channelSet;
        private Set<String> sinkgroupSet;
    
        private final List<FlumeConfigurationError> errorList;
    
        // ** 省略其他代码
        
    }

    这里比较崩溃,跑来跑去的我们来梳理下:

    先是FlumeConfiguration的addRawProperty方法执行put动作:

      FlumeConfiguration.agentConfigMap.put(agentName, AgentConfiguration conf);

    在这里创建AgentCponfiguration对象

      aconf = new AgentCponfiguration(agentName, errors);

    然后执行

      aconf.addProperty(configKey, value), 将key,value插入到AgentConfiguration对象的*ContextMap中,至此完成键值对的保存工作;

    最终所有的键值对都被保存在AgentConfiguration对象中;

    对于AgentConfiguration的插入操作,也分为两个部分,在FlumeConifiguration构造方法中,

    1:addRawProperty(name,value);将kv键值对,插入到AgentConfiguration对象的contextMap中;

    2:validateConfiguration;下边跟进下validateConfiguration代码;

    validateConfiguration -> aconf.isValid() -> validateChannels(channelSet)

    代码注释中,其实已经写得比较详细了

        /**
         * If it is a known component it will do the full validation required for
         * that component, else it will do the validation required for that class.
         */
        private Set<String> validateChannels(Set<String> channelSet) {
          Iterator<String> iter = channelSet.iterator();
          Map<String, Context> newContextMap = new HashMap<String, Context>();
          ChannelConfiguration conf = null;
          /*
           * The logic for the following code:
           *
           * Is it a known component?
           *  -Yes: Get the ChannelType and set the string name of that to
           *        config and set configSpecified to true.
           *  -No.Look for config type for the given component:
           *      -Config Found:
           *        Set config to the type mentioned, set configSpecified to true
           *      -No Config found:
           *        Set config to OTHER, configSpecified to false,
           *        do basic validation. Leave the context in the
           *        contextMap to process later. Setting it to other returns
           *        a vanilla configuration(Source/Sink/Channel Configuration),
           *        which does basic syntactic validation. This object is not
           *        put into the map, so the context is retained which can be
           *        picked up - this is meant for older classes which don't
           *        implement ConfigurableComponent.
           */
          while (iter.hasNext()) {
            String channelName = iter.next();
            Context channelContext = channelContextMap.get(channelName);
            // Context exists in map.
            if (channelContext != null) {
              // Get the configuration object for the channel:
              ChannelType chType = getKnownChannel(channelContext.getString(
                  BasicConfigurationConstants.CONFIG_TYPE));
              boolean configSpecified = false;
              String config = null;
              // Not a known channel - cannot do specific validation to this channel
              if (chType == null) {
                config = channelContext.getString(BasicConfigurationConstants.CONFIG_CONFIG);
                if (config == null || config.isEmpty()) {
                  config = "OTHER";
                } else {
                  configSpecified = true;
                }
              } else {
                config = chType.toString().toUpperCase(Locale.ENGLISH);
                configSpecified = true;
              }
    
              try {
                conf =
                    (ChannelConfiguration) ComponentConfigurationFactory.create(
                        channelName, config, ComponentType.CHANNEL);
                logger.debug("Created channel " + channelName);
                if (conf != null) {
                  conf.configure(channelContext);
                }
                if ((configSpecified && conf.isNotFoundConfigClass()) ||
                    !configSpecified) {
                  newContextMap.put(channelName, channelContext);
                } else if (configSpecified) {
                  channelConfigMap.put(channelName, conf);
                }
                if (conf != null) {
                  errorList.addAll(conf.getErrors());
                }
              } catch (ConfigurationException e) {
                // Could not configure channel - skip it.
                // No need to add to error list - already added before exception is
                // thrown
                if (conf != null) errorList.addAll(conf.getErrors());
                iter.remove();
                logger.warn("Could not configure channel " + channelName
                    + " due to: " + e.getMessage(), e);
    
              }
            } else {
              iter.remove();
              errorList.add(new FlumeConfigurationError(agentName, channelName,
                  FlumeConfigurationErrorType.CONFIG_ERROR, ErrorOrWarning.ERROR));
            }
          }
          channelContextMap = newContextMap;
          Set<String> tempchannelSet = new HashSet<String>();
          tempchannelSet.addAll(channelConfigMap.keySet());
          tempchannelSet.addAll(channelContextMap.keySet());
          channelSet.retainAll(tempchannelSet);
          return channelSet;
        }

    在validChannels中,根据配置文件中的channel的类型,创建ChannelConfiguration对象,然后插入到AgentConfiguration的configMap中;

    到这里,一个完整的FlumeConfiguration对象已经完全加载好了;下面继续;

     

    期待成为寂寞高手的武林老白
  • 相关阅读:
    第三方类AFNetworking(一)
    objective-C nil,Nil,NULL 和NSNull的小结
    DOM解析XML文件
    设置导航栏字体大小,颜色和加粗字体的方法
    数据本地化之沙盒机制
    本地存储Sqlite的用法:
    iOS面试题
    iOS 知识-常用小技巧大杂烩
    怎么升级iOS10教程
    2016WWDC详解
  • 原文地址:https://www.cnblogs.com/aquariusm/p/6118976.html
Copyright © 2011-2022 走看看