zoukankan      html  css  js  c++  java
  • ES选主策略

    ES版本5.6.3

    1、整个流程的开始,实在node启动后触发的,Node.java中start()方法,通过调用ZenDiscovery.java中的doStart()方法,之后会调用startInitialJoin方法开始进行加入现有的cluster或者选主。

    public void startInitialJoin() {
            // start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
            clusterService.submitStateUpdateTask("initial_join", new LocalClusterUpdateTask() {
    
                @Override
                public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
                    // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
                    joinThreadControl.startNewThreadIfNotRunning();
                    return unchanged();
                }
    
                @Override
                public void onFailure(String source, @org.elasticsearch.common.Nullable Exception e) {
                    logger.warn("failed to start initial join process", e);
                }
            });
     }

    2、ZenDiscovery类中startNewThreadIfNotRunning方法中innerJoinCluster()为实质性进行选主操作,其中findMaster()选择master节点。 

    private void innerJoinCluster() {
            DiscoveryNode masterNode = null;
            final Thread currentThread = Thread.currentThread();
            nodeJoinController.startElectionContext();
            while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
                masterNode = findMaster();
            }
    
           ......
        }

    3、在findMaster()中。通过pingAndWait()方法获取当前可以ping通的节点,并获取PingResponse,此信息中包含节点信息以及该节点当前的master节点信息。之后,根据获取的节点开始进行选主。

      此处有一参数需要注意:discovery.zen.master_election.ignore_non_master_pings,默认值为false,表明数据节点(node.master: false    node.data: true)是否参与选主,一般我们集群节点数较少时,不用修改此配置,如果集群规模很大,可以考虑只允许主节点参与选主操作。

      然后,根据获取的pingResponses来判断当前是否有master节点存在,存储在activeMasters中,对于master的候选节点存储在masterCandidates中。

      如果activeMasters为空,表明当前并未有master节点存在,则进行选主操作,即步骤4。这里需要注意的是discovery.zen.minimum_master_nodes,候选节点数必选大于等于该参数,选主才能继续,否则是无法选主的。该参数一般配置为(N/2)+1,防止集群出现脑裂。

      如果activeMasters不为空,则当前的master节点,即为步骤5中的找到的master节点。

     private DiscoveryNode findMaster() {
            logger.trace("starting to ping");
            List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
            if (fullPingResponses == null) {
                logger.trace("No full ping responses");
                return null;
            }
            if (logger.isTraceEnabled()) {
                StringBuilder sb = new StringBuilder();
                if (fullPingResponses.size() == 0) {
                    sb.append(" {none}");
                } else {
                    for (ZenPing.PingResponse pingResponse : fullPingResponses) {
                        sb.append("
    	--> ").append(pingResponse);
                    }
                }
                logger.trace("full ping responses:{}", sb);
            }
    
            final DiscoveryNode localNode = clusterService.localNode();
    
            // add our selves
            assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
                .filter(n -> n.equals(localNode)).findAny().isPresent() == false;
    
            fullPingResponses.add(new ZenPing.PingResponse(localNode, null, clusterService.state()));
    
            // filter responses
            final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
    
            List<DiscoveryNode> activeMasters = new ArrayList<>();
            for (ZenPing.PingResponse pingResponse : pingResponses) {
                // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
                // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
                if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
                    activeMasters.add(pingResponse.master());
                }
            }
    
            // nodes discovered during pinging
            List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
            for (ZenPing.PingResponse pingResponse : pingResponses) {
                if (pingResponse.node().isMasterNode()) {
                    masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
                }
            }
    
            if (activeMasters.isEmpty()) {
                if (electMaster.hasEnoughCandidates(masterCandidates)) {
                    final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
                    logger.trace("candidate {} won election", winner);
                    return winner.getNode();
                } else {
                    // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                    logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
                                masterCandidates, electMaster.minimumMasterNodes());
                    return null;
                }
            } else {
                assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
                // lets tie break between discovered nodes
                return electMaster.tieBreakActiveMasters(activeMasters);
            }
        }

    4、ElectMasterService.java中的electMaster()方法为选主的具体实现,逻辑十分简单根据当前的候选节点进行排序,排在第一个的即为master节点。

    public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
            assert hasEnoughCandidates(candidates);
            List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
            sortedCandidates.sort(MasterCandidate::compare);
            return sortedCandidates.get(0);
        }

     5、当存在master节点时,则加入现有的集群中,如果是多个master节点,则会选择排在第一个的master节点作为需要加入的集群。

    public DiscoveryNode tieBreakActiveMasters(Collection<DiscoveryNode> activeMasters) {
            return activeMasters.stream().min(ElectMasterService::compareNodes).get();
        }

    6、在选择完master节点后,需要进行集群组建了。如果当前选择出的master节点为本节点,则本节点需要等待其他节点来加入。这个逻辑不太确定,是异步实现的。大体如下:在每个node启动时,均会注册internal:discovery/zen/join请求,待其为maser后,其他节点通过该请求与之通信加入到master所在集群。在MembershipListener.java中注册监听,当有节点加入时,通过异步函数将信息存储在NodeJoinController下的内部类ElectionContext中,具体判断是否获取了足够的节点,判断当次选举是否成功。--------这块逻辑比较绕,不确定理解是否正确,如哪位大神比较了解,望不吝赐教啊!!!!

    7、如果选择的master节点不是本节点,则选择加入该集群ZenDiscovery.java中joinElectedMaster(),尝试次数为discovery.zen.join_retry_attempts由控制,默认为3次,每次的超时时间:discovery.zen.join_timeout控制,默认值为discovery.zen.ping_timeout*20也就是60ms。所以这个参数不宜配置过长,否则在选举失败的超时时间就会比较长。如果加入master失败或者超时,则会进行新的一轮选主,直到选则出满足条件的master节点。

    private boolean joinElectedMaster(DiscoveryNode masterNode) {
            try {
                // first, make sure we can connect to the master
                transportService.connectToNode(masterNode);
            } catch (Exception e) {
                logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to connect to master [{}], retrying...", masterNode), e);
                return false;
            }
            int joinAttempt = 0; // we retry on illegal state if the master is not yet ready
            while (true) {
                try {
                    logger.trace("joining master {}", masterNode);
                    membership.sendJoinRequestBlocking(masterNode, clusterService.localNode(), joinTimeout);
                    return true;
                } catch (Exception e) {
                    final Throwable unwrap = ExceptionsHelper.unwrapCause(e);
                    if (unwrap instanceof NotMasterException) {
                        if (++joinAttempt == this.joinRetryAttempts) {
                            logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
                            return false;
                        } else {
                            logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
                        }
                    } else {
                        if (logger.isTraceEnabled()) {
                            logger.trace((Supplier<?>) () -> new ParameterizedMessage("failed to send join request to master [{}]", masterNode), e);
                        } else {
                            logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ExceptionsHelper.detailedMessage(e));
                        }
                        return false;
                    }
                }
    
                try {
                    Thread.sleep(this.joinRetryDelay.millis());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }

    至此,master就已经选择完成了。大概逻辑就是这样,可能中间一些细节有待进一步深究。

  • 相关阅读:
    MVC3 模板页页预留Section
    LINQ表达式总结笔记
    分布式事务管理器(MSDTC)的事务处理异常的排错
    ado。net的事物BeginTransaction demo
    TransactionScope类使用场景和方法介绍
    Linq中使用Left Join
    FullText Search in ASP.NET using Lucene.NET
    EF的BeginTransaction 用法
    mvc4 @Html.Partial,@Html.RenderPartial
    Android监听EditText内容变化
  • 原文地址:https://www.cnblogs.com/woniu4/p/9348664.html
Copyright © 2011-2022 走看看