zoukankan      html  css  js  c++  java
  • Flume-ng源码解析之启动流程

    今天我们通过阅读Flume-NG的源码来看看Flume的整个启动流程,废话不多说,翠花,上源码!!

    1 主类也是启动类

    在这里我贴出Application中跟启动有关的方法,其他你们可以自己看源码,毕竟源码解析解的是思路。

    org.apache.flume.node.Application

      /*主函数*/
      public static void main(String[] args) {
    
        try {
    
          boolean isZkConfigured = false;
    
          Options options = new Options();
    
          Option option = new Option("n", "name", true, "the name of this agent");
          option.setRequired(true);
          options.addOption(option);
    
          option = new Option("f", "conf-file", true,
              "specify a config file (required if -z missing)");
          option.setRequired(false);
          options.addOption(option);
    
          option = new Option(null, "no-reload-conf", false,
              "do not reload config file if changed");
          options.addOption(option);
    
          // Options for Zookeeper
          option = new Option("z", "zkConnString", true,
              "specify the ZooKeeper connection to use (required if -f missing)");
          option.setRequired(false);
          options.addOption(option);
    
          option = new Option("p", "zkBasePath", true,
              "specify the base path in ZooKeeper for agent configs");
          option.setRequired(false);
          options.addOption(option);
    
          option = new Option("h", "help", false, "display help text");
          options.addOption(option);
    
          CommandLineParser parser = new GnuParser();
          CommandLine commandLine = parser.parse(options, args);
    
          if (commandLine.hasOption('h')) {
            new HelpFormatter().printHelp("flume-ng agent", options, true);
            return;
          }
    
          String agentName = commandLine.getOptionValue('n');
          boolean reload = !commandLine.hasOption("no-reload-conf");
    
          if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
            isZkConfigured = true;
          }
          
          Application application = null;
          if (isZkConfigured) {
            // get options
            String zkConnectionStr = commandLine.getOptionValue('z');
            String baseZkPath = commandLine.getOptionValue('p');
    
            if (reload) {
              EventBus eventBus = new EventBus(agentName + "-event-bus");
              List<LifecycleAware> components = Lists.newArrayList();
              PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider =
                  new PollingZooKeeperConfigurationProvider(
                      agentName, zkConnectionStr, baseZkPath, eventBus);
              components.add(zookeeperConfigurationProvider);
              application = new Application(components);
              eventBus.register(application);
            } else {
              StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider =
                  new StaticZooKeeperConfigurationProvider(
                      agentName, zkConnectionStr, baseZkPath);
              application = new Application();
              application.handleConfigurationEvent(zookeeperConfigurationProvider.getConfiguration());
            }
          } else {
            File configurationFile = new File(commandLine.getOptionValue('f'));
            /*
             * 确保当文件不存在时agent会启动失败
             */
            if (!configurationFile.exists()) {
              // If command line invocation, then need to fail fast
              if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
                  null) {
                String path = configurationFile.getPath();
                try {
                  path = configurationFile.getCanonicalPath();
                } catch (IOException ex) {
                  logger.error("Failed to read canonical path for file: " + path,
                      ex);
                }
                throw new ParseException(
                    "The specified configuration file does not exist: " + path);
              }
            }
            List<LifecycleAware> components = Lists.newArrayList();
    
            if (reload) {
              EventBus eventBus = new EventBus(agentName + "-event-bus");
              PollingPropertiesFileConfigurationProvider configurationProvider =
                  new PollingPropertiesFileConfigurationProvider(
                      agentName, configurationFile, eventBus, 30);
              components.add(configurationProvider);
              application = new Application(components);
              eventBus.register(application);
            } else {
              PropertiesFileConfigurationProvider configurationProvider =
                  new PropertiesFileConfigurationProvider(agentName, configurationFile);
              application = new Application();
              application.handleConfigurationEvent(configurationProvider.getConfiguration());
            }
          }
          application.start();
    
          final Application appReference = application;
          Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
            @Override
            public void run() {
              appReference.stop();
            }
          });
    
        } catch (Exception e) {
          logger.error("A fatal error occurred while running. Exception follows.", e);
        }
      }
      
      /*启动方法*/
      public synchronized void start() {
        for (LifecycleAware component : components) {
          supervisor.supervise(component,
              new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
        }
      }
    
      /*响应EventBus的方法*/
      @Subscribe
      public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
        stopAllComponents();
        startAllComponents(conf);
      }
    
      /*停止方法*/
      public synchronized void stop() {
        supervisor.stop();
        if (monitorServer != null) {
          monitorServer.stop();
        }
      }
    
      /*停止所有组件*/
      private void stopAllComponents() {
        if (this.materializedConfiguration != null) {
          logger.info("Shutting down configuration: {}", this.materializedConfiguration);
          for (Entry<String, SourceRunner> entry :
               this.materializedConfiguration.getSourceRunners().entrySet()) {
            try {
              logger.info("Stopping Source " + entry.getKey());
              supervisor.unsupervise(entry.getValue());
            } catch (Exception e) {
              logger.error("Error while stopping {}", entry.getValue(), e);
            }
          }
    
          for (Entry<String, SinkRunner> entry :
               this.materializedConfiguration.getSinkRunners().entrySet()) {
            try {
              logger.info("Stopping Sink " + entry.getKey());
              supervisor.unsupervise(entry.getValue());
            } catch (Exception e) {
              logger.error("Error while stopping {}", entry.getValue(), e);
            }
          }
    
          for (Entry<String, Channel> entry :
               this.materializedConfiguration.getChannels().entrySet()) {
            try {
              logger.info("Stopping Channel " + entry.getKey());
              supervisor.unsupervise(entry.getValue());
            } catch (Exception e) {
              logger.error("Error while stopping {}", entry.getValue(), e);
            }
          }
        }
        if (monitorServer != null) {
          monitorServer.stop();
        }
      }
    
      /*启动所有组件*/
      private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
        logger.info("Starting new configuration:{}", materializedConfiguration);
    
        this.materializedConfiguration = materializedConfiguration;
    	/*启动Channel*/
        for (Entry<String, Channel> entry :
            materializedConfiguration.getChannels().entrySet()) {
          try {
            logger.info("Starting Channel " + entry.getKey());
            supervisor.supervise(entry.getValue(),
                new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
          } catch (Exception e) {
            logger.error("Error while starting {}", entry.getValue(), e);
          }
        }
    
        /*
         * Wait for all channels to start.
         */
        for (Channel ch : materializedConfiguration.getChannels().values()) {
          while (ch.getLifecycleState() != LifecycleState.START
              && !supervisor.isComponentInErrorState(ch)) {
            try {
              logger.info("Waiting for channel: " + ch.getName() +
                  " to start. Sleeping for 500 ms");
              Thread.sleep(500);
            } catch (InterruptedException e) {
              logger.error("Interrupted while waiting for channel to start.", e);
              Throwables.propagate(e);
            }
          }
        }
    	
    	/*启动SinkRunner*/
        for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
          try {
            logger.info("Starting Sink " + entry.getKey());
            supervisor.supervise(entry.getValue(),
                new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
          } catch (Exception e) {
            logger.error("Error while starting {}", entry.getValue(), e);
          }
        }
    	/*启动SourceRunner*/
        for (Entry<String, SourceRunner> entry :
             materializedConfiguration.getSourceRunners().entrySet()) {
          try {
            logger.info("Starting Source " + entry.getKey());
            supervisor.supervise(entry.getValue(),
                new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
          } catch (Exception e) {
            logger.error("Error while starting {}", entry.getValue(), e);
          }
        }
    
        this.loadMonitoring();
      }
    

    1)40行:检查Shell命令,如果含有'h'字符则返回帮助命令

    2)52行:判断命令行中是否有ZooKeeper相关信息,如果有则获取zookeeper相关信息,通过PollingZooKeeperConfigurationProvider或者StaticZooKeeperConfigurationProvider 去调用存储在zookeeper中的配置信息,如果没有的话则调用PollingPropertiesFileConfigurationProvider或者PropertiesFileConfigurationProvider去指定路径读取配置文件进行加载。

    3)57行和96行:无论从zookeeper中还是file中获取配置文件,都需要判断命令行中是否有“no-reload-conf”,根据reload = !commandLine.hasOption("no-reload-conf")获得结果,如果reload为真,那么程序会每隔30秒检查一次配置文件,如果检查到配置文件发生变化则关闭原有组件,重新启动,这部分细节在XXProvider中体现。

    4)如果是动态轮询的方式,那么会将application注册到EventBus中,然后调用Application类中的start()来启动components,如果是只加载一次的话,则使用handleConfigurationEvent()来启动。handleConfigurationEvent()同时也是EventBus的回调函数,当eventbus执行post时,该方法就会被调用。

    5)所有Application的start()实际上调用了XXProvider中的start(),来执行真正组件的启动。

    6)无论是reload还是一次加载,我们可以看到的是都是调用handleConfigurationEvent()来执行,该方法最终是调用stopAllComponents()和startAllComponent来完成的

    7)stopAllComponent我们就不说了,重点来看看startAllComponents,它的启动顺序是有讲究的,先启动channels,再启动SinkRunner,最后启动SourceRunner。

    这里面有两个问题:
    1)启动顺序的问题:为什么要先启动channel,因为sink和source的连接纽带就是channel,而且sink和source的启动都要判断是否存在连接channel,所以channel要先启动,至于sink比source先启动,我认为应该是sink作为最后消费者,而source作为数据来源,那么防止数据堆积在channel中,所以先启动消费者,再启动生产者,需求促进供给嘛,瞎扯的。
    2)SourceRunner和SinkRunner是什么玩意,source和sink哪里去了,这里就简单说下,SourceRunner和SinkRunner是用于启动source和sink的驱动类,我们在下一篇source、sink和channel的分析中再来细说

    2 XXProvider

    该类是启动组件时主要的类,这里就以PollingPropertiesFileConfigurationProvider举例说明,上代码

    public class PollingPropertiesFileConfigurationProvider
        extends PropertiesFileConfigurationProvider
        implements LifecycleAware {
    
      private static final Logger LOGGER =
          LoggerFactory.getLogger(PollingPropertiesFileConfigurationProvider.class);
    
      private final EventBus eventBus;
      private final File file;
      private final int interval;
      private final CounterGroup counterGroup;
      private LifecycleState lifecycleState;
    
      private ScheduledExecutorService executorService;
    
      public PollingPropertiesFileConfigurationProvider(String agentName,
          File file, EventBus eventBus, int interval) {
        super(agentName, file);
        this.eventBus = eventBus;
        this.file = file;
        this.interval = interval;
        counterGroup = new CounterGroup();
        lifecycleState = LifecycleState.IDLE;
      }
    
      @Override
      public void start() {
        LOGGER.info("Configuration provider starting");
    
        Preconditions.checkState(file != null,
            "The parameter file must not be null");
    
        executorService = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
                    .build());
    
        FileWatcherRunnable fileWatcherRunnable =
            new FileWatcherRunnable(file, counterGroup);
    
        executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
            TimeUnit.SECONDS);
    
        lifecycleState = LifecycleState.START;
    
        LOGGER.debug("Configuration provider started");
      }
    
      @Override
      public void stop() {
        LOGGER.info("Configuration provider stopping");
    
        executorService.shutdown();
        try {
          while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
            LOGGER.debug("Waiting for file watcher to terminate");
          }
        } catch (InterruptedException e) {
          LOGGER.debug("Interrupted while waiting for file watcher to terminate");
          Thread.currentThread().interrupt();
        }
        lifecycleState = LifecycleState.STOP;
        LOGGER.debug("Configuration provider stopped");
      }
    
      @Override
      public synchronized  LifecycleState getLifecycleState() {
        return lifecycleState;
      }
    
    
      @Override
      public String toString() {
        return "{ file:" + file + " counterGroup:" + counterGroup + "  provider:"
            + getClass().getCanonicalName() + " agentName:" + getAgentName() + " }";
      }
    
      public class FileWatcherRunnable implements Runnable {
    
        private final File file;
        private final CounterGroup counterGroup;
    
        private long lastChange;
    
        public FileWatcherRunnable(File file, CounterGroup counterGroup) {
          super();
          this.file = file;
          this.counterGroup = counterGroup;
          this.lastChange = 0L;
        }
    
        @Override
        public void run() {
          LOGGER.debug("Checking file:{} for changes", file);
    
          counterGroup.incrementAndGet("file.checks");
    
          long lastModified = file.lastModified();
    
          if (lastModified > lastChange) {
            LOGGER.info("Reloading configuration file:{}", file);
    
            counterGroup.incrementAndGet("file.loads");
    
            lastChange = lastModified;
    
            try {
              eventBus.post(getConfiguration());
            } catch (Exception e) {
              LOGGER.error("Failed to load configuration data. Exception follows.",
                  e);
            } catch (NoClassDefFoundError e) {
              LOGGER.error("Failed to start agent because dependencies were not " +
                  "found in classpath. Error follows.", e);
            } catch (Throwable t) {
              // caught because the caller does not handle or log Throwables
              LOGGER.error("Unhandled error", t);
            }
          }
        }
      }
    
    }
    

    1)我们在Application类中看看它的start()

      public synchronized void start() {
        for (LifecycleAware component : components) {
          supervisor.supervise(component,
              new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
        }
      }
    

    这里使用supervisor.supervise()来启动component,那么我就不带大家去看LifecycleSupervisor这个类,在这里就告诉大家,这个方法内部还是调用了LifecycleAware的start()来进行启动。既然讲到了LifecycleAware接口,怎么说都得看看代码

    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public interface LifecycleAware {
    
      public void start();
      public void stop();
      public LifecycleState getLifecycleState();
    
    }
    

    非常简单,就是三个方法,start()、stop()和getLifecycleState,但是flume大多数涉及启动关闭的类都实现了它。

    2)我们可以看到PollingPropertiesFileConfigurationProvider也是实现了LifecycleAware接口,那么appliaction的start()实际上就是调用PollingPropertiesFileConfigurationProvider的start()

    3)48行:启动了一个30秒执行一次的线程,也就是30秒检查一次配置文件。这个线程是一个内部类FileWatcherRunnable(77行)

    4)99行:判断文件是否有改动,如果有则调用eventBus.post(getConfiguration()).那么订阅了事件的Application类则会调用handleConfigurationEvent()执行组件的全部关闭和重启。

    5)同时我们注意到该类中是没有getConfiguration()的,该方法是它的父类AbstractConfigurationProvider中定义的,用于获取配置文件信息,这里就不带大家看了,有兴趣可以看一下。

    启动流程的分析就到此为止,这里显示不了行号,尴尬

  • 相关阅读:
    编程语言学哪个比较好?
    C#一定要避免程序中很多的依靠
    EXPIREAT
    EXISTS
    DUMP
    Python之sys模块
    Python的OS模块
    CentOS 7上安装gitlab-runner
    PyCharm激活方法
    Linux03 文件的相关操作(touch、rm、mv、cat)
  • 原文地址:https://www.cnblogs.com/simple-focus/p/6522529.html
Copyright © 2011-2022 走看看