zoukankan      html  css  js  c++  java
  • elasticSearch6源码分析(1)启动过程

    1.找到bin目录,下面有elasticSearch的sh文件,查看执行过程

      exec 
        "$JAVA" 
        $ES_JAVA_OPTS 
        -Des.path.home="$ES_HOME" 
        -Des.path.conf="$ES_PATH_CONF" 
        -Des.distribution.flavor="$ES_DISTRIBUTION_FLAVOR" 
        -Des.distribution.type="$ES_DISTRIBUTION_TYPE" 
        -cp "$ES_CLASSPATH" 
        org.elasticsearch.bootstrap.Elasticsearch 
        "$@"

    可以看到主类的名称为:

    Elasticsearch

    2.主类Elasticsearch

    找到main方法,父类

    Command的execute()方法,ElasticSearch重写了该方法
        @Override
        protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
            if (options.nonOptionArguments().isEmpty() == false) {
                throw new UserException(ExitCodes.USAGE, "Positional arguments not allowed, found " + options.nonOptionArguments());
            }
            if (options.has(versionOption)) {
                final String versionOutput = String.format(
                        Locale.ROOT,
                        "Version: %s, Build: %s/%s/%s/%s, JVM: %s",
                        Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot()),
                        Build.CURRENT.flavor().displayName(),
                        Build.CURRENT.type().displayName(),
                        Build.CURRENT.shortHash(),
                        Build.CURRENT.date(),
                        JvmInfo.jvmInfo().version());
                terminal.println(versionOutput);
                return;
            }
    
            final boolean daemonize = options.has(daemonizeOption);
            final Path pidFile = pidfileOption.value(options);
            final boolean quiet = options.has(quietOption);
    
            // a misconfigured java.io.tmpdir can cause hard-to-diagnose problems later, so reject it immediately
            try {
                env.validateTmpFile();
            } catch (IOException e) {
                throw new UserException(ExitCodes.CONFIG, e.getMessage());
            }
    
            try {
                init(daemonize, pidFile, quiet, env);
            } catch (NodeValidationException e) {
                throw new UserException(ExitCodes.CONFIG, e.getMessage());
            }
        }
    
        void init(final boolean daemonize, final Path pidFile, final boolean quiet, Environment initialEnv)
            throws NodeValidationException, UserException {
            try {
                Bootstrap.init(!daemonize, pidFile, quiet, initialEnv);
            } catch (BootstrapException | RuntimeException e) {
                // format exceptions to the console in a special way
                // to avoid 2MB stacktraces from guice, etc.
                throw new StartupException(e);
            }
        }

    2.启动类Bootstrap

    init方法

      /**
         * This method is invoked by {@link Elasticsearch#main(String[])} to startup elasticsearch.
         */
        static void init(
                final boolean foreground,
                final Path pidFile,
                final boolean quiet,
                final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
            // force the class initializer for BootstrapInfo to run before
            // the security manager is installed
            BootstrapInfo.init();
    
            INSTANCE = new Bootstrap();
    
            final SecureSettings keystore = loadSecureSettings(initialEnv);
            final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile());
    
            LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings()));
            try {
                LogConfigurator.configure(environment);
            } catch (IOException e) {
                throw new BootstrapException(e);
            }
            if (environment.pidFile() != null) {
                try {
                    PidFile.create(environment.pidFile(), true);
                } catch (IOException e) {
                    throw new BootstrapException(e);
                }
            }
    
            final boolean closeStandardStreams = (foreground == false) || quiet;
            try {
                if (closeStandardStreams) {
                    final Logger rootLogger = LogManager.getRootLogger();
                    final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
                    if (maybeConsoleAppender != null) {
                        Loggers.removeAppender(rootLogger, maybeConsoleAppender);
                    }
                    closeSystOut();
                }
    
                // fail if somebody replaced the lucene jars
                checkLucene();
    
                // install the default uncaught exception handler; must be done before security is
                // initialized as we do not want to grant the runtime permission
                // setDefaultUncaughtExceptionHandler
                Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());
    
                INSTANCE.setup(true, environment);
    
                try {
                    // any secure settings must be read during node construction
                    IOUtils.close(keystore);
                } catch (IOException e) {
                    throw new BootstrapException(e);
                }
    
                INSTANCE.start();
    
                if (closeStandardStreams) {
                    closeSysError();
                }
            } catch (NodeValidationException | RuntimeException e) {
                // disable console logging, so user does not see the exception twice (jvm will show it already)
                final Logger rootLogger = LogManager.getRootLogger();
                final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
                if (foreground && maybeConsoleAppender != null) {
                    Loggers.removeAppender(rootLogger, maybeConsoleAppender);
                }
                Logger logger = LogManager.getLogger(Bootstrap.class);
                // HACK, it sucks to do this, but we will run users out of disk space otherwise
                if (e instanceof CreationException) {
                    // guice: log the shortened exc to the log file
                    ByteArrayOutputStream os = new ByteArrayOutputStream();
                    PrintStream ps = null;
                    try {
                        ps = new PrintStream(os, false, "UTF-8");
                    } catch (UnsupportedEncodingException uee) {
                        assert false;
                        e.addSuppressed(uee);
                    }
                    new StartupException(e).printStackTrace(ps);
                    ps.flush();
                    try {
                        logger.error("Guice Exception: {}", os.toString("UTF-8"));
                    } catch (UnsupportedEncodingException uee) {
                        assert false;
                        e.addSuppressed(uee);
                    }
                } else if (e instanceof NodeValidationException) {
                    logger.error("node validation exception
    {}", e.getMessage());
                } else {
                    // full exception
                    logger.error("Exception", e);
                }
                // re-enable it if appropriate, so they can see any logging during the shutdown process
                if (foreground && maybeConsoleAppender != null) {
                    Loggers.addAppender(rootLogger, maybeConsoleAppender);
                }
    
                throw e;
            }
        }

    找到红色的启动方法start,进去看,是Node的start方法

        private void start() throws NodeValidationException {
            node.start();
            keepAliveThread.start();
        }

    3.节点启动Node

    start方法

      /**
         * Start the node. If the node is already started, this method is no-op.
         */
        public Node start() throws NodeValidationException {
            if (!lifecycle.moveToStarted()) {
                return this;
            }
    
            logger.info("starting ...");
            pluginLifecycleComponents.forEach(LifecycleComponent::start);
    
            injector.getInstance(MappingUpdatedAction.class).setClient(client);
            injector.getInstance(IndicesService.class).start();
            injector.getInstance(IndicesClusterStateService.class).start();
            injector.getInstance(SnapshotsService.class).start();
            injector.getInstance(SnapshotShardsService.class).start();
            injector.getInstance(RoutingService.class).start();
            injector.getInstance(SearchService.class).start();
            nodeService.getMonitorService().start();
    
            final ClusterService clusterService = injector.getInstance(ClusterService.class);
    
            final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
            nodeConnectionsService.start();
            clusterService.setNodeConnectionsService(nodeConnectionsService);
    
            injector.getInstance(ResourceWatcherService.class).start();
            injector.getInstance(GatewayService.class).start();
            Discovery discovery = injector.getInstance(Discovery.class);
            clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
    
            // Start the transport service now so the publish address will be added to the local disco node in ClusterService
            TransportService transportService = injector.getInstance(TransportService.class);
            transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
            transportService.start();
            assert localNodeFactory.getNode() != null;
            assert transportService.getLocalNode().equals(localNodeFactory.getNode())
                : "transportService has a different local node than the factory provided";
            final MetaData onDiskMetadata;
            try {
                // we load the global state here (the persistent part of the cluster state stored on disk) to
                // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
                if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {
                    onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState();
                } else {
                    onDiskMetadata = MetaData.EMPTY_META_DATA;
                }
                assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
            validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata), transportService.boundAddress(), pluginsService
                .filterPlugins(Plugin
                .class)
                .stream()
                .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
    
            clusterService.addStateApplier(transportService.getTaskManager());
            // start after transport service so the local disco is known
            discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
            clusterService.start();
            assert clusterService.localNode().equals(localNodeFactory.getNode())
                : "clusterService has a different local node than the factory provided";
            transportService.acceptIncomingRequests();
            discovery.startInitialJoin();
            final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
            if (initialStateTimeout.millis() > 0) {
                final ThreadPool thread = injector.getInstance(ThreadPool.class);
                ClusterState clusterState = clusterService.state();
                ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
                if (clusterState.nodes().getMasterNodeId() == null) {
                    logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
                    final CountDownLatch latch = new CountDownLatch(1);
                    observer.waitForNextChange(new ClusterStateObserver.Listener() {
                        @Override
                        public void onNewClusterState(ClusterState state) { latch.countDown(); }
    
                        @Override
                        public void onClusterServiceClose() {
                            latch.countDown();
                        }
    
                        @Override
                        public void onTimeout(TimeValue timeout) {
                            logger.warn("timed out while waiting for initial discovery state - timeout: {}",
                                initialStateTimeout);
                            latch.countDown();
                        }
                    }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);
    
                    try {
                        latch.await();
                    } catch (InterruptedException e) {
                        throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
                    }
                }
            }
    
            injector.getInstance(HttpServerTransport.class).start();
    
            if (WRITE_PORTS_FILE_SETTING.get(settings)) {
                TransportService transport = injector.getInstance(TransportService.class);
                writePortsFile("transport", transport.boundAddress());
                HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
                writePortsFile("http", http.boundAddress());
            }
    
            logger.info("started");
    
            pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);
    
            return this;
        }

    里面一个非常重要的对象Injector,我们看看它的定义:

    /**
     * Builds the graphs of objects that make up your application. The injector tracks the dependencies
     * for each type and uses bindings to inject them. This is the core of Guice, although you rarely
     * interact with it directly. This "behind-the-scenes" operation is what distinguishes dependency
     * injection from its cousin, the service locator pattern.
     * <p>
     * Contains several default bindings:
     * <ul>
     * <li>This {@link Injector} instance itself
     * <li>A {@code Provider<T>} for each binding of type {@code T}
     * <li>The {@link java.util.logging.Logger} for the class being injected
     * <li>The {@link Stage} in which the Injector was created
     * </ul>
     * <p>
     * Injectors are created using the facade class {@link Guice}.
     * <p>
     * An injector can also {@link #injectMembers(Object) inject the dependencies} of
     * already-constructed instances. This can be used to interoperate with objects created by other
     * frameworks or services.
     *
     * @author crazybob@google.com (Bob Lee)
     * @author jessewilson@google.com (Jesse Wilson)
     */

    简单的说,Injector是一个实例管理器,和spring中IOC的beanfactory功能相当。

    需要启动的服务如下:

    后续会针对每个服务做单独的分析

  • 相关阅读:
    各个数字类型取值范围以及推理
    进制转换原理
    位运算操作符_
    读取文件内容
    java中thread的start()和run()的区别
    二进制的负数转换
    位运算符号
    Hadoop的辉煌还能延续多久?
    Hadoop 新 MapReduce 框架 Yarn 详解
    MapReduce工作原理讲解
  • 原文地址:https://www.cnblogs.com/davidwang456/p/10038579.html
Copyright © 2011-2022 走看看