zoukankan      html  css  js  c++  java
  • elasticSearch6源码分析(12)DiscoveryModule

    1.DiscoveryModule概述

    /**
     * A module for loading classes for node discovery.
     */

    2.discovery 

    The discovery module is responsible for discovering nodes within a cluster, as well as electing a master node.
    
    Note, Elasticsearch is a peer to peer based system, nodes communicate with one another directly if operations are delegated / broadcast. All the main APIs (index, delete, search) do not communicate with the master node. The responsibility of the master node is to maintain the global cluster state, and act if nodes join or leave the cluster by reassigning shards. Each time a cluster state is changed, the state is made known to the other nodes in the cluster (the manner depends on the actual discovery implementation).

    调用情况Node.java

                final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService, namedWriteableRegistry,
                    networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
                    clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
                    clusterModule.getAllocationService(), environment.configFile());
                this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
                    transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
                    httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
                    searchTransportService);

    Discovery接口

    /**
     * A pluggable module allowing to implement discovery of other nodes, publishing of the cluster
     * state to all nodes, electing a master of the cluster that raises cluster state change
     * events.
     */

    实现类有两个:

    2.1 SingleNodeDiscovery

    构造方法

        public SingleNodeDiscovery(final Settings settings, final TransportService transportService,
                                   final MasterService masterService, final ClusterApplier clusterApplier) {
            super(Objects.requireNonNull(settings));
            this.transportService = Objects.requireNonNull(transportService);
            masterService.setClusterStateSupplier(() -> clusterState);
            this.clusterApplier = clusterApplier;
        }

    启动方法

       @Override
        protected synchronized void doStart() {
            // set initial state
            DiscoveryNode localNode = transportService.getLocalNode();
            clusterState = createInitialState(localNode);
            clusterApplier.setInitialState(clusterState);
        }

    2.2 ZenDiscovery

    构造方法

     public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
                            NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier,
                            ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService,
                            Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators) {
            super(settings);
            this.onJoinValidators = addBuiltInJoinValidators(onJoinValidators);
            this.masterService = masterService;
            this.clusterApplier = clusterApplier;
            this.transportService = transportService;
            this.discoverySettings = new DiscoverySettings(settings, clusterSettings);
            this.zenPing = newZenPing(settings, threadPool, transportService, hostsProvider);
            this.electMaster = new ElectMasterService(settings);
            this.pingTimeout = PING_TIMEOUT_SETTING.get(settings);
            this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
            this.joinRetryAttempts = JOIN_RETRY_ATTEMPTS_SETTING.get(settings);
            this.joinRetryDelay = JOIN_RETRY_DELAY_SETTING.get(settings);
            this.maxPingsFromAnotherMaster = MAX_PINGS_FROM_ANOTHER_MASTER_SETTING.get(settings);
            this.sendLeaveRequest = SEND_LEAVE_REQUEST_SETTING.get(settings);
            this.threadPool = threadPool;
            ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
            this.committedState = new AtomicReference<>();
    
            this.masterElectionIgnoreNonMasters = MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING.get(settings);
            this.masterElectionWaitForJoinsTimeout = MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING.get(settings);
    
            logger.debug("using ping_timeout [{}], join.timeout [{}], master_election.ignore_non_master [{}]",
                    this.pingTimeout, joinTimeout, masterElectionIgnoreNonMasters);
    
            clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
                this::handleMinimumMasterNodesChanged, (value) -> {
                    final ClusterState clusterState = this.clusterState();
                    int masterNodes = clusterState.nodes().getMasterNodes().size();
                    // the purpose of this validation is to make sure that the master doesn't step down
                    // due to a change in master nodes, which also means that there is no way to revert
                    // an accidental change. Since we validate using the current cluster state (and
                    // not the one from which the settings come from) we have to be careful and only
                    // validate if the local node is already a master. Doing so all the time causes
                    // subtle issues. For example, a node that joins a cluster has no nodes in its
                    // current cluster state. When it receives a cluster state from the master with
                    // a dynamic minimum master nodes setting int it, we must make sure we don't reject
                    // it.
    
                    if (clusterState.nodes().isLocalNodeElectedMaster() && value > masterNodes) {
                        throw new IllegalArgumentException("cannot set "
                            + ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey() + " to more than the current" +
                            " master nodes count [" + masterNodes + "]");
                    }
            });
    
            this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this::clusterState, masterService, clusterName);
            this.masterFD.addListener(new MasterNodeFailureListener());
            this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, this::clusterState, clusterName);
            this.nodesFD.addListener(new NodeFaultDetectionListener());
            this.pendingStatesQueue = new PendingClusterStatesQueue(logger, MAX_PENDING_CLUSTER_STATES_SETTING.get(settings));
    
            this.publishClusterState =
                    new PublishClusterStateAction(
                            settings,
                            transportService,
                            namedWriteableRegistry,
                            this,
                            discoverySettings);
            this.membership = new MembershipAction(settings, transportService, new MembershipListener(), onJoinValidators);
            this.joinThreadControl = new JoinThreadControl();
    
            this.nodeJoinController = new NodeJoinController(masterService, allocationService, electMaster, settings);
            this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::submitRejoin, logger);
    
            masterService.setClusterStateSupplier(this::clusterState);
    
            transportService.registerRequestHandler(
                DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());
        }

    启动方法

        @Override
        protected void doStart() {
            DiscoveryNode localNode = transportService.getLocalNode();
            assert localNode != null;
            synchronized (stateMutex) {
                // set initial state
                assert committedState.get() == null;
                assert localNode != null;
                ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
                ClusterState initialState = builder
                    .blocks(ClusterBlocks.builder()
                        .addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
                        .addGlobalBlock(discoverySettings.getNoMasterBlock()))
                    .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()))
                    .build();
                committedState.set(initialState);
                clusterApplier.setInitialState(initialState);
                nodesFD.setLocalNode(localNode);
                joinThreadControl.start();
            }
            zenPing.start();
        }
  • 相关阅读:
    USB HID Report Descriptor 报告描述符详解
    USB 管道 && 端点
    USB描述符解析-->枚举.
    USB HID介绍
    USB2.0速度识别
    spring core
    好妈妈【第三章】一生受用的品格教育,不止孩子需要,父母也需要。
    好妈妈【第二章】把学习做成轻松的事,父母如何提升孩子的学习成绩
    好妈妈【第一章】提高爱的质量,小学前的儿童教育
    Java日志系统
  • 原文地址:https://www.cnblogs.com/davidwang456/p/10169058.html
Copyright © 2011-2022 走看看