zoukankan      html  css  js  c++  java
  • elasticsearch之discovery

    一、elasticsearch之discovery ping机制

    在es的设计中,一个集群必须有一个主节点(master node)。用来处理请求、索引的创建、修改、节点管理等。
    当有了master节点,该节点就要对各子节点进行周期性(心跳机制)的探测,保证整个集群的健康。
    主节点和各节点之间都会进行心跳检测,比如mater要确保各节点健康状况、是否宕机等,而子节点也要要确保master的健康状况,一旦master宕机,各子节点要重新选举新的master。这种相互间的心跳检测就是cluster的faultdetection。下图展示了faultdetection继承关系。

    faultdetection有两种实现方式,分别是master探测其他节点和其他节点对master的探测。faultdetection抽象了方法handleTransportDisconnect,该方法在内部类FDConnectionListener 中被调用。es中大量使用了listener的异步方式,因此可以大大的提升系统性能:

    Copy
    private class FDConnectionListener implements TransportConnectionListener {
            @Override
            public void onNodeConnected(DiscoveryNode node) {
            }
    
            @Override
            public void onNodeDisconnected(DiscoveryNode node) {
                handleTransportDisconnect(node);
            }
        }
    

    faultdetection启动时就会注册相应的FDConnectionListener,在周期性检测时,发现有节点失联,会通过onNodeDisconnected方法回调handleTransportDisconnect进行处理。先来看masterFaultdetection的启动代码:

    Copy
    private void innerStart(final DiscoveryNode masterNode) {
        this.masterNode = masterNode;
                this.retryCount = 0;
                this.notifiedMasterFailure.set(false);
    
                // 尝试连接master节点
                try {
                    transportService.connectToNode(masterNode);
                } catch (final Exception e) {
                    // 连接失败通知masterNode失败
    
                    notifyMasterFailure(masterNode, "failed to perform initial connect [" + e.getMessage() + "]");
                    return;
                }
            //关闭之前的masterping,重启新的masterping
                if (masterPinger != null) {
                    masterPinger.stop();
                }
                this.masterPinger = new MasterPinger();
    
                // 周期之后启动masterPing,这里并没有周期启动masterPing,只是设定了延迟时间。
                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
            }
    

    再来看master连接失败的处理逻辑:

    Copy
    private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) {
            if (notifiedMasterFailure.compareAndSet(false, true)) {
                threadPool.generic().execute(new Runnable() {
                    @Override
                    public void run() {
                //通知所有listener master丢失
                        for (Listener listener : listeners) {
                            listener.onMasterFailure(masterNode, reason);
                        }
                    }
                });
                stop("master failure, " + reason);
            }
        }
    

    zen discovery机制实现了listener.onMasterFailure接口,处理master失联的相关问题。下面是部分示例代码:

    Copy
    private class MasterPinger implements Runnable {
    
            private volatile boolean running = true;
    
            public void stop() {
                this.running = false;
            }
    
            @Override
            public void run() {
                if (!running) {
                    // return and don't spawn...
                    return;
                }
                final DiscoveryNode masterToPing = masterNode;
       final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);
                final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
                transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {
    
                            @Override
                            public MasterPingResponseResponse newInstance() {
                                return new MasterPingResponseResponse();
                            }
    
                            @Override
                            public void handleResponse(MasterPingResponseResponse response) {
                                if (!running) {
                                    return;
                                }
                                // reset the counter, we got a good result
                                MasterFaultDetection.this.retryCount = 0;
                                // check if the master node did not get switched on us..., if it did, we simply return with no reschedule
                                if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                    // 启动新的ping周期
                                    threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
                                }
                            }
    
                            @Override
                            public void handleException(TransportException exp) {
                                if (!running) {
                                    return;
                                }
                                synchronized (masterNodeMutex) {
                                    // check if the master node did not get switched on us...
                                    if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                        if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                                            handleTransportDisconnect(masterToPing);
                                            return;
                                        } else if (exp.getCause() instanceof NoLongerMasterException) {
                                            logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
                                            notifyMasterFailure(masterToPing, "no longer master");
                                            return;
                                        } else if (exp.getCause() instanceof NotMasterException) {
                                            logger.debug("[master] pinging a master {} that is not the master", masterNode);
                                            notifyMasterFailure(masterToPing, "not master");
                                            return;
                                        } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
                                            logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masterNode);
                                            notifyMasterFailure(masterToPing, "do not exists on master, act as master failure");
                                            return;
                                        }
    
                                        int retryCount = ++MasterFaultDetection.this.retryCount;
                                        logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount);
                                        if (retryCount >= pingRetryCount) {
                                            logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", masterNode, pingRetryCount, pingRetryTimeout);
                                            // not good, failure
                                            notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with  maximum [" + pingRetryTimeout + "] timeout");
                                        } else {
                                             // resend the request, not reschedule, rely on send timeout
                                            transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
                                        }
                                    }
                                }
                            }
    
                );
            }
        }
    

    masterPing是一个线程,在innerStart的方法中没有设定周期启动masterPing,但是由于masterPing需要进行心跳检测,这个问题就交给了上例的run方法。如果ping成功就会重启一个新的ping,这样既保证了ping线程的唯一性同时也保证了ping的顺序和间隔。ping的方式同样是通过transport发送一个masterPingRequest进行连接,节点收到该请求后,如果该节点已不再是master就会抛出一个NotMasterException。否则会响应notifyMasterFailure方法。对于网络问题导致的无响应情况,会调用handleTransportDisconnect(masterToPing)方法处理:

    Copy
    protected void handleTransportDisconnect(DiscoveryNode node) {
        //这里需要同步
            synchronized (masterNodeMutex) {
            //master 已经换成其它节点,就没必要再连接
                if (!node.equals(this.masterNode)) {
                    return;
                }
                if (connectOnNetworkDisconnect) {
                    try {
                //尝试再次连接
                        transportService.connectToNode(node);
                        // if all is well, make sure we restart the pinger
                        if (masterPinger != null) {
                            masterPinger.stop();
                        }
                //连接成功启动新的masterping
                        this.masterPinger = new MasterPinger();
                        // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
                        threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger);
                    } catch (Exception e) {
                //连接出现异常,启动master节点丢失通知
                        logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
                        notifyMasterFailure(masterNode, "transport disconnected (with verified connect)");
                    }
                } else {
              //不需要重连,通知master丢失。
                    logger.trace("[master] [{}] transport disconnected", node);
                    notifyMasterFailure(node, "transport disconnected");
                }
            }
        }
    

    就是masterfaultDetection的整个流程:
    启动中如果master节点失联则通知节点丢失,否则在一定延迟(3s)后启动masterPingmasterPing线程尝试连接master节点,如果master节点仍然失联,则再次尝试连接。master节点收到masterPingRequest请求后首先看一下自己还是不是master,如果不是则抛出异常,否则正常回应。节点如果收到响应式异常则启动master丢失通知,否则此次ping结束。在一定时间后重新启动新的masterPing线程。
    这里只是说master的faultdetection,而node的faultdetection跟master逻辑相似。区别主要在于ping异常处理上。
    在node的faultdetection中,当某个node出现异常或者没有响应,会启动node丢失机制,只是具体的处理逻辑不同。


    欢迎斧正,that's all see also:[cluster discovery概述及FaultDetection分析](https://www.cnblogs.com/zziawanblog/p/6533731.html)

    二、elasticsearch之discovery节点探测

    在es的设计中,一个集群必须有一个主节点(master node)。用来处理请求、索引的创建、修改、节点管理等。
    当有了master节点,该节点就要对各子节点进行周期性(心跳机制)的探测,保证整个集群的健康。
    主节点和各节点之间都会进行心跳检测,比如mater要确保各节点健康状况、是否宕机等,而子节点也要要确保master的健康状况,一旦master宕机,各子节点要重新选举新的master。这种相互间的心跳检测就是cluster的faultdetection。下图展示了faultdetection继承关系。

    faultdetection有两种实现方式,分别是master探测其他节点和其他节点对master的探测。faultdetection抽象了方法handleTransportDisconnect,该方法在内部类FDConnectionListener 中被调用。es中大量使用了listener的异步方式,因此可以大大的提升系统性能:

    Copy
    private class FDConnectionListener implements TransportConnectionListener {
            @Override
            public void onNodeConnected(DiscoveryNode node) {
            }
    
            @Override
            public void onNodeDisconnected(DiscoveryNode node) {
                handleTransportDisconnect(node);
            }
        }
    

    faultdetection启动时就会注册相应的FDConnectionListener,在周期性检测时,发现有节点失联,会通过onNodeDisconnected方法回调handleTransportDisconnect进行处理。先来看masterFaultdetection的启动代码:

    Copy
    private void innerStart(final DiscoveryNode masterNode) {
        this.masterNode = masterNode;
                this.retryCount = 0;
                this.notifiedMasterFailure.set(false);
    
                // 尝试连接master节点
                try {
                    transportService.connectToNode(masterNode);
                } catch (final Exception e) {
                    // 连接失败通知masterNode失败
    
                    notifyMasterFailure(masterNode, "failed to perform initial connect [" + e.getMessage() + "]");
                    return;
                }
            //关闭之前的masterping,重启新的masterping
                if (masterPinger != null) {
                    masterPinger.stop();
                }
                this.masterPinger = new MasterPinger();
    
                // 周期之后启动masterPing,这里并没有周期启动masterPing,只是设定了延迟时间。
                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
            }
    

    再来看master连接失败的处理逻辑:

    Copy
    private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) {
            if (notifiedMasterFailure.compareAndSet(false, true)) {
                threadPool.generic().execute(new Runnable() {
                    @Override
                    public void run() {
                //通知所有listener master丢失
                        for (Listener listener : listeners) {
                            listener.onMasterFailure(masterNode, reason);
                        }
                    }
                });
                stop("master failure, " + reason);
            }
        }
    

    zen discovery机制实现了listener.onMasterFailure接口,处理master失联的相关问题。下面是部分示例代码:

    Copy
    private class MasterPinger implements Runnable {
    
            private volatile boolean running = true;
    
            public void stop() {
                this.running = false;
            }
    
            @Override
            public void run() {
                if (!running) {
                    // return and don't spawn...
                    return;
                }
                final DiscoveryNode masterToPing = masterNode;
       final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);
                final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
                transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {
    
                            @Override
                            public MasterPingResponseResponse newInstance() {
                                return new MasterPingResponseResponse();
                            }
    
                            @Override
                            public void handleResponse(MasterPingResponseResponse response) {
                                if (!running) {
                                    return;
                                }
                                // reset the counter, we got a good result
                                MasterFaultDetection.this.retryCount = 0;
                                // check if the master node did not get switched on us..., if it did, we simply return with no reschedule
                                if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                    // 启动新的ping周期
                                    threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
                                }
                            }
    
                            @Override
                            public void handleException(TransportException exp) {
                                if (!running) {
                                    return;
                                }
                                synchronized (masterNodeMutex) {
                                    // check if the master node did not get switched on us...
                                    if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                        if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                                            handleTransportDisconnect(masterToPing);
                                            return;
                                        } else if (exp.getCause() instanceof NoLongerMasterException) {
                                            logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
                                            notifyMasterFailure(masterToPing, "no longer master");
                                            return;
                                        } else if (exp.getCause() instanceof NotMasterException) {
                                            logger.debug("[master] pinging a master {} that is not the master", masterNode);
                                            notifyMasterFailure(masterToPing, "not master");
                                            return;
                                        } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
                                            logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masterNode);
                                            notifyMasterFailure(masterToPing, "do not exists on master, act as master failure");
                                            return;
                                        }
    
                                        int retryCount = ++MasterFaultDetection.this.retryCount;
                                        logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount);
                                        if (retryCount >= pingRetryCount) {
                                            logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", masterNode, pingRetryCount, pingRetryTimeout);
                                            // not good, failure
                                            notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with  maximum [" + pingRetryTimeout + "] timeout");
                                        } else {
                                             // resend the request, not reschedule, rely on send timeout
                                            transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
                                        }
                                    }
                                }
                            }
    
                );
            }
        }
    

    masterPing是一个线程,在innerStart的方法中没有设定周期启动masterPing,但是由于masterPing需要进行心跳检测,这个问题就交给了上例的run方法。如果ping成功就会重启一个新的ping,这样既保证了ping线程的唯一性同时也保证了ping的顺序和间隔。ping的方式同样是通过transport发送一个masterPingRequest进行连接,节点收到该请求后,如果该节点已不再是master就会抛出一个NotMasterException。否则会响应notifyMasterFailure方法。对于网络问题导致的无响应情况,会调用handleTransportDisconnect(masterToPing)方法处理:

    Copy
    protected void handleTransportDisconnect(DiscoveryNode node) {
        //这里需要同步
            synchronized (masterNodeMutex) {
            //master 已经换成其它节点,就没必要再连接
                if (!node.equals(this.masterNode)) {
                    return;
                }
                if (connectOnNetworkDisconnect) {
                    try {
                //尝试再次连接
                        transportService.connectToNode(node);
                        // if all is well, make sure we restart the pinger
                        if (masterPinger != null) {
                            masterPinger.stop();
                        }
                //连接成功启动新的masterping
                        this.masterPinger = new MasterPinger();
                        // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
                        threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger);
                    } catch (Exception e) {
                //连接出现异常,启动master节点丢失通知
                        logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
                        notifyMasterFailure(masterNode, "transport disconnected (with verified connect)");
                    }
                } else {
              //不需要重连,通知master丢失。
                    logger.trace("[master] [{}] transport disconnected", node);
                    notifyMasterFailure(node, "transport disconnected");
                }
            }
        }
    

    就是masterfaultDetection的整个流程:
    启动中如果master节点失联则通知节点丢失,否则在一定延迟(3s)后启动masterPingmasterPing线程尝试连接master节点,如果master节点仍然失联,则再次尝试连接。master节点收到masterPingRequest请求后首先看一下自己还是不是master,如果不是则抛出异常,否则正常回应。节点如果收到响应式异常则启动master丢失通知,否则此次ping结束。在一定时间后重新启动新的masterPing线程。
    这里只是说master的faultdetection,而node的faultdetection跟master逻辑相似。区别主要在于ping异常处理上。
    在node的faultdetection中,当某个node出现异常或者没有响应,会启动node丢失机制,只是具体的处理逻辑不同。


    欢迎斧正,that's all see also:[cluster discovery概述及FaultDetection分析](https://www.cnblogs.com/zziawanblog/p/6533731.html)
  • 相关阅读:
    WorkerMan中php后端及时推送消息给客户端
    解析Laravel框架下的Contracts契约
    如何在 Laravel 中使用 Slack 进行异常通知
    用户异常与模拟异常的派发
    [IDA] 将变量索引进行计算
    [反汇编]栈帧中函数的参数传入位置
    用户模拟异常的记录
    [反汇编]函数开始部分利用mov ebx,esp找到返回地址(_KTRAP_FRAME结构)
    [反汇编] 获取上一个栈帧的ebp
    CPU异常分析(以trap00为例)
  • 原文地址:https://www.cnblogs.com/bubu99/p/13612796.html
Copyright © 2011-2022 走看看