zoukankan      html  css  js  c++  java
  • elasticSearch6源码分析(2)模块化管理

    elasticsearch里面的组件基本都是用Guice的Injector进行注入与获取实例方式进行模块化管理。

    在node的构造方法中

    /**
         * Constructs a node
         *
         * @param environment                the environment for this node
         * @param classpathPlugins           the plugins to be loaded from the classpath
         * @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the
         *                                   test framework for tests that rely on being able to set private settings
         */
        protected Node(
                final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
            logger = LogManager.getLogger(Node.class);
            final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
            boolean success = false;
            try {
                originalSettings = environment.settings();
                Settings tmpSettings = Settings.builder().put(environment.settings())
                    .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
    
                nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
                resourcesToClose.add(nodeEnvironment);
                logger.info("node name [{}], node ID [{}]",
                        NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId());
    
                final JvmInfo jvmInfo = JvmInfo.jvmInfo();
                logger.info(
                    "version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
                    Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot()),
                    jvmInfo.pid(),
                    Build.CURRENT.flavor().displayName(),
                    Build.CURRENT.type().displayName(),
                    Build.CURRENT.shortHash(),
                    Build.CURRENT.date(),
                    Constants.OS_NAME,
                    Constants.OS_VERSION,
                    Constants.OS_ARCH,
                    Constants.JVM_VENDOR,
                    Constants.JVM_NAME,
                    Constants.JAVA_VERSION,
                    Constants.JVM_VERSION);
                logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
                warnIfPreRelease(Version.CURRENT, Build.CURRENT.isSnapshot(), logger);
    
                if (logger.isDebugEnabled()) {
                    logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]",
                        environment.configFile(), Arrays.toString(environment.dataFiles()), environment.logsFile(), environment.pluginsFile());
                }
    
                this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins);
                this.settings = pluginsService.updatedSettings();
                localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
    
                // create the environment based on the finalized (processed) view of the settings
                // this is just to makes sure that people get the same settings, no matter where they ask them from
                this.environment = new Environment(this.settings, environment.configFile());
                Environment.assertEquivalent(environment, this.environment);
    
                final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
    
                final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
                resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
                // adds the context to the DeprecationLogger so that it does not need to be injected everywhere
                DeprecationLogger.setThreadContext(threadPool.getThreadContext());
                resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
    
                final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
                final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
                for (final ExecutorBuilder<?> builder : threadPool.builders()) {
                    additionalSettings.addAll(builder.getRegisteredSettings());
                }
                client = new NodeClient(settings, threadPool);
                final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
                final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
                AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
                // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
                // so we might be late here already
    
                final Set<SettingUpgrader<?>> settingsUpgraders = pluginsService.filterPlugins(Plugin.class)
                        .stream()
                        .map(Plugin::getSettingUpgraders)
                        .flatMap(List::stream)
                        .collect(Collectors.toSet());
    
                final SettingsModule settingsModule =
                        new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter, settingsUpgraders);
                scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
                resourcesToClose.add(resourceWatcherService);
                final NetworkService networkService = new NetworkService(
                    getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
    
                List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
                final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
                clusterService.addStateApplier(scriptModule.getScriptService());
                resourcesToClose.add(clusterService);
                final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
                    scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
                final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state,
                    clusterService.getClusterSettings(), client);
                final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
                    listener::onNewInfo);
                final UsageService usageService = new UsageService(settings);
    
                ModulesBuilder modules = new ModulesBuilder();
                // plugin modules must be added here, before others or we can get crazy injection errors...
                for (Module pluginModule : pluginsService.createGuiceModules()) {
                    modules.add(pluginModule);
                }
                final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
                ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);
                modules.add(clusterModule);
                IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
                modules.add(indicesModule);
    
                SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
                CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
                    settingsModule.getClusterSettings());
                resourcesToClose.add(circuitBreakerService);
                modules.add(new GatewayModule());
    
    
                PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
                BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
                resourcesToClose.add(bigArrays);
                modules.add(settingsModule);
                List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
                    NetworkModule.getNamedWriteables().stream(),
                    indicesModule.getNamedWriteables().stream(),
                    searchModule.getNamedWriteables().stream(),
                    pluginsService.filterPlugins(Plugin.class).stream()
                        .flatMap(p -> p.getNamedWriteables().stream()),
                    ClusterModule.getNamedWriteables().stream())
                    .flatMap(Function.identity()).collect(Collectors.toList());
                final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
                NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(
                    NetworkModule.getNamedXContents().stream(),
                    indicesModule.getNamedXContents().stream(),
                    searchModule.getNamedXContents().stream(),
                    pluginsService.filterPlugins(Plugin.class).stream()
                        .flatMap(p -> p.getNamedXContent().stream()),
                    ClusterModule.getNamedXWriteables().stream())
                    .flatMap(Function.identity()).collect(toList()));
                modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
                final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
    
                // collect engine factory providers from server and from plugins
                final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);
                final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders =
                        Stream.concat(
                                indicesModule.getEngineFactories().stream(),
                                enginePlugins.stream().map(plugin -> plugin::getEngineFactory))
                        .collect(Collectors.toList());
    
    
                final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories =
                        pluginsService.filterPlugins(IndexStorePlugin.class)
                                .stream()
                                .map(IndexStorePlugin::getIndexStoreFactories)
                                .flatMap(m -> m.entrySet().stream())
                                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    
                final IndicesService indicesService =
                        new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),
                                clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
                                threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays,
                                scriptModule.getScriptService(), client, metaStateService, engineFactoryProviders, indexStoreFactories);
    
                final AliasValidator aliasValidator = new AliasValidator(settings);
    
                final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(
                        settings,
                        clusterService,
                        indicesService,
                        clusterModule.getAllocationService(),
                        aliasValidator,
                        environment,
                        settingsModule.getIndexScopedSettings(),
                        threadPool,
                        xContentRegistry,
                        forbidPrivateIndexSettings);
    
                Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
                    .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
                                                     scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,
                                                     namedWriteableRegistry).stream())
                    .collect(Collectors.toList());
    
                ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
                    settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
                    threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
                modules.add(actionModule);
    
                final RestController restController = actionModule.getRestController();
                final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
                    threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
                    networkService, restController);
                Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders =
                    pluginsService.filterPlugins(Plugin.class).stream()
                        .map(Plugin::getCustomMetaDataUpgrader)
                        .collect(Collectors.toList());
                Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders =
                    pluginsService.filterPlugins(Plugin.class).stream()
                        .map(Plugin::getIndexTemplateMetaDataUpgrader)
                        .collect(Collectors.toList());
                Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream()
                        .map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList());
                final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders);
                final MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry,
                    indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders);
                final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, nodeEnvironment, metaStateService,
                    metaDataIndexUpgradeService, metaDataUpgrader);
                new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders);
                final Transport transport = networkModule.getTransportSupplier().get();
                Set<String> taskHeaders = Stream.concat(
                    pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
                    Stream.of(Task.X_OPAQUE_ID)
                ).collect(Collectors.toSet());
                final TransportService transportService = newTransportService(settings, transport, threadPool,
                    networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
                final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService);
                final SearchTransportService searchTransportService =  new SearchTransportService(settings, transportService,
                    SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
                final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);
    
                final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService, namedWriteableRegistry,
                    networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
                    clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
                    clusterModule.getAllocationService(), environment.configFile());
                this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
                    transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
                    httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
                    searchTransportService);
    
                final SearchService searchService = newSearchService(clusterService, indicesService,
                    threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
                    responseCollectorService);
    
                final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
                    .filterPlugins(PersistentTaskPlugin.class).stream()
                    .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client))
                    .flatMap(List::stream)
                    .collect(toList());
    
                final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors);
                final PersistentTasksClusterService persistentTasksClusterService =
                    new PersistentTasksClusterService(settings, registry, clusterService);
                final PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client);
    
                modules.add(b -> {
                        b.bind(Node.class).toInstance(this);
                        b.bind(NodeService.class).toInstance(nodeService);
                        b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
                        b.bind(PluginsService.class).toInstance(pluginsService);
                        b.bind(Client.class).toInstance(client);
                        b.bind(NodeClient.class).toInstance(client);
                        b.bind(Environment.class).toInstance(this.environment);
                        b.bind(ThreadPool.class).toInstance(threadPool);
                        b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
                        b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
                        b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
                        b.bind(BigArrays.class).toInstance(bigArrays);
                        b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
                        b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
                        b.bind(IngestService.class).toInstance(ingestService);
                        b.bind(UsageService.class).toInstance(usageService);
                        b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
                        b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
                        b.bind(MetaStateService.class).toInstance(metaStateService);
                        b.bind(IndicesService.class).toInstance(indicesService);
                        b.bind(AliasValidator.class).toInstance(aliasValidator);
                        b.bind(MetaDataCreateIndexService.class).toInstance(metaDataCreateIndexService);
                        b.bind(SearchService.class).toInstance(searchService);
                        b.bind(SearchTransportService.class).toInstance(searchTransportService);
                        b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings,
                            searchService::createReduceContext));
                        b.bind(Transport.class).toInstance(transport);
                        b.bind(TransportService.class).toInstance(transportService);
                        b.bind(NetworkService.class).toInstance(networkService);
                        b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
                        b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService);
                        b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
                        b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
                        b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
                        {
                            RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
                            processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
                            b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(settings, transportService,
                                    indicesService, recoverySettings));
                            b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(settings, threadPool,
                                    transportService, recoverySettings, clusterService));
                        }
                        b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
                        pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
                        b.bind(PersistentTasksService.class).toInstance(persistentTasksService);
                        b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
                        b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
                    }
                );
                injector = modules.createInjector();
    
                // TODO hack around circular dependencies problems in AllocationService
                clusterModule.getAllocationService().setGatewayAllocator(injector.getInstance(GatewayAllocator.class));
    
                List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
                    .filter(p -> p instanceof LifecycleComponent)
                    .map(p -> (LifecycleComponent) p).collect(Collectors.toList());
                pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()
                    .map(injector::getInstance).collect(Collectors.toList()));
                resourcesToClose.addAll(pluginLifecycleComponents);
                this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
                client.initialize(injector.getInstance(new Key<Map<Action, TransportAction>>() {}),
                        () -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
    
                logger.debug("initializing HTTP handlers ...");
                actionModule.initRestHandlers(() -> clusterService.state().nodes());
                logger.info("initialized");
    
                success = true;
            } catch (IOException ex) {
                throw new ElasticsearchException("failed to bind service", ex);
            } finally {
                if (!success) {
                    IOUtils.closeWhileHandlingException(resourcesToClose);
                }
            }
        }

    涉及的主要模块

     

    上图的文本如下;

    ClusterModule
    GatewayAllocator
    AllocationService
    ClusterService
    NodeConnectionsService
    MetaDataDeleteIndexService
    MetaDataIndexStateService
    MetaDataMappingService
    MetaDataIndexAliasesService
    MetaDataUpdateSettingsService
    MetaDataIndexTemplateService
    IndexNameExpressionResolver
    RoutingService
    DelayedAllocationService
    ShardStateAction
    NodeMappingRefreshAction
    MappingUpdatedAction
    TaskResultsService
    AllocationDeciders
    ShardsAllocator

    IndicesModule
    IndicesStore
    IndicesClusterStateService
    SyncedFlushService
    TransportNodesListShardStoreMetaData
    GlobalCheckpointSyncAction
    TransportResyncReplicationAction
    PrimaryReplicaSyncer

    其他
    Node
    NodeService
    NamedXContentRegistry
    PluginsService
    Client
    NodeClient
    Environment
    ThreadPool
    NodeEnvironment
    ResourceWatcherService
    CircuitBreakerService
    BigArrays
    ScriptService
    AnalysisRegistry
    IngestService
    UsageService
    NamedWriteableRegistry
    MetaDataUpgrader
    MetaStateService
    IndicesService
    AliasValidator
    MetaDataCreateIndexService
    SearchService
    SearchTransportService
    SearchPhaseController
    Transport
    TransportService
    NetworkService
    UpdateHelper
    MetaDataIndexUpgradeService
    ClusterInfoService
    GatewayMetaState
    Discovery
    PeerRecoverySourceService
    PeerRecoveryTargetService
    HttpServerTransport
    PersistentTasksService
    PersistentTasksClusterService
    PersistentTasksExecutorRegistry

    pluginModule

    GatewayModule
    DanglingIndicesState
    GatewayService
    TransportNodesListGatewayMetaState
    TransportNodesListGatewayStartedShards
    LocalAllocateDangledIndices

    SettingsModule
    Settings
    SettingsFilter
    ClusterSettings
    IndexScopedSettings

    ActionModule
    ActionFilters
    DestructiveOperations
    AutoCreateIndex
    TransportLivenessAction
    TransportAction
    supportAction

    RepositoriesModule
    RepositoriesService
    SnapshotsService
    SnapshotShardsService
    TransportNodesSnapshotsStatus
    RestoreService

  • 相关阅读:
    react hooks子给父传值
    npm安装依赖 and 删除依赖
    react 阻止事件冒泡
    http 500状态码
    vue中插槽slot的使用
    怎样在vue项目中使用axios处理接口请求
    GET 与 POST 其实没有什么区别
    LazyMan
    什么是微服务,什么是分布式
    思索 p5.js 的最佳实践
  • 原文地址:https://www.cnblogs.com/davidwang456/p/10039399.html
Copyright © 2011-2022 走看看