zoukankan      html  css  js  c++  java
  • es的集群通信机制transport

    es的集群通信机制transport

    transport通信基础

    transport是集群间通信的基础,它有两种实现:

    • localtransport,主要用于jvm中的节点通信,因为在同一个jvm上的网络模拟,localtransport的实现也相对简单,但实际用处在es中有限。
    • nettytransport,一种基于netty实现的transport,同样用于节点间的通信。

    我们这里以nettytransport来展开。
    transport是集群通信的基本通道,无论是集群的状态信息,还是索引请求信息,都由transport传送。es定义了包括transport接口在内的所有基础接口,NettyTransport也实现了该接口。
    来简要的说下Netty的用法,Netty的使用依赖三个模块:

    • ServerBootStrap,启动服务。
    • ClientBootStrap,启动客户端并建立于服务端的连接。
    • MessageHandler,负责主要的业务逻辑。

    NettyTransport在doStart方法中调用ServerBootStrap和ClientBootStrap并绑定ip:

    Copy
    protected void doStart() throws ElasticsearchException {
           clientBootstrap = createClientBootstrap();//根据配置启动客户端
           //省略了无关分代码
        createServerBootstrap(name, mergedSettings);//启动server端
           bindServerBootstrap(name, mergedSettings);//绑定ip
            }
    

    bindServerBootstrap将本地ip绑定到netty同时设定好export host。然后启动client和server的过程将mergedSettings注入到channelpipeline中,至此启动过程结束,但需要注意的是,现在client端并未连接server端,这个连接过程是在节点启动后才进行连接。

    Copy
    public void connectToNode(DiscoveryNode node, boolean light) {
         //transport的模块必须要启动
            if (!lifecycle.started()) {
                throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
            }
         //获取读锁,每个节点可以和多个节点建立连接,因此这里用读锁
    
            globalLock.readLock().lock();
            try {
            //以node.id为基础获取一个锁,这保证对于每个node只能建立一次连接
                connectionLock.acquire(node.id());
                try {
                    if (!lifecycle.started()) {
                        throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
                    }
                    NodeChannels nodeChannels = connectedNodes.get(node);
                    if (nodeChannels != null) {
                        return;
                    }
                    try {
                        if (light) {//这里的light,就是对该节点只获取一个channel,所有类型(5种连接类型下面会说到)都使用者一个channel
                            nodeChannels = connectToChannelsLight(node);
                        } else {
                            nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);
                            try {
                                connectToChannels(nodeChannels, node);
                            } catch (Throwable e) {
                                logger.trace("failed to connect to [{}], cleaning dangling connections", e, node);
                                nodeChannels.close();
                                throw e;
                            }
                        }
                        // we acquire a connection lock, so no way there is an existing connection
                        connectedNodes.put(node, nodeChannels);
                        if (logger.isDebugEnabled()) {
                            logger.debug("connected to node [{}]", node);
                        }
                        transportServiceAdapter.raiseNodeConnected(node);
                    } catch (ConnectTransportException e) {
                        throw e;
                    } catch (Exception e) {
                        throw new ConnectTransportException(node, "general node connection failure", e);
                    }
                } finally {
                    connectionLock.release(node.id());
                }
            } finally {
                globalLock.readLock().unlock();
            }
        }
    

    在每个server和client之间都有5个连接,每个连接承担着不同的任务:

    Copy
    protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
        //五种连接方式,不同的连接方式对应不同的集群操作
            ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length];
            ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length];
            ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length];
            ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length];
            ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length];
            InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
        //尝试建立连接
            for (int i = 0; i < connectRecovery.length; i++) {
                connectRecovery[i] = clientBootstrap.connect(address);
            }
            for (int i = 0; i < connectBulk.length; i++) {
                connectBulk[i] = clientBootstrap.connect(address);
            }
            for (int i = 0; i < connectReg.length; i++) {
                connectReg[i] = clientBootstrap.connect(address);
            }
            for (int i = 0; i < connectState.length; i++) {
                connectState[i] = clientBootstrap.connect(address);
            }
            for (int i = 0; i < connectPing.length; i++) {
                connectPing[i] = clientBootstrap.connect(address);
            }
        //获取每个连接的channel存入到相应的channels中便于后面使用。
            try {
                for (int i = 0; i < connectRecovery.length; i++) {
                    connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                    if (!connectRecovery[i].isSuccess()) {
                        throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause());
                    }
                    nodeChannels.recovery[i] = connectRecovery[i].getChannel();
                    nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node));
                }
    
                for (int i = 0; i < connectBulk.length; i++) {
                    connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                    if (!connectBulk[i].isSuccess()) {
                        throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause());
                    }
                    nodeChannels.bulk[i] = connectBulk[i].getChannel();
                    nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node));
                }
    
                for (int i = 0; i < connectReg.length; i++) {
                    connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                    if (!connectReg[i].isSuccess()) {
                        throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause());
                    }
                    nodeChannels.reg[i] = connectReg[i].getChannel();
                    nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node));
                }
    
                for (int i = 0; i < connectState.length; i++) {
                    connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                    if (!connectState[i].isSuccess()) {
                        throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause());
                    }
                    nodeChannels.state[i] = connectState[i].getChannel();
                    nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node));
                }
    
                for (int i = 0; i < connectPing.length; i++) {
                    connectPing[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
                    if (!connectPing[i].isSuccess()) {
                        throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectPing[i].getCause());
                    }
                    nodeChannels.ping[i] = connectPing[i].getChannel();
                    nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node));
                }
    
                if (nodeChannels.recovery.length == 0) {
                    if (nodeChannels.bulk.length > 0) {
                        nodeChannels.recovery = nodeChannels.bulk;
                    } else {
                        nodeChannels.recovery = nodeChannels.reg;
                    }
                }
                if (nodeChannels.bulk.length == 0) {
                    nodeChannels.bulk = nodeChannels.reg;
                }
            } catch (RuntimeException e) {
                // clean the futures
                for (ChannelFuture future : ImmutableList.<ChannelFuture>builder().add(connectRecovery).add(connectBulk).add(connectReg).add(connectState).add(connectPing).build()) {
                    future.cancel();
                    if (future.getChannel() != null && future.getChannel().isOpen()) {
                        try {
                            future.getChannel().close();
                        } catch (Exception e1) {
                            // ignore
                        }
                    }
                }
                throw e;
            }
        }
    

    上例为节点建立连接的过程,每一对client和server间都会建立一定数量的不同连接,为什么要有这些不同的连接呢?是因为不同的操作消耗的资源也不同,请求的频率也不相同,对于资源消耗少、请求频率高的ping请求,可以多建立一些连接,来确保并发,对于资源消耗大的操作如bulk操作,则要少建立一些连接,防止机器负载过大可能导致节点失联。
    总的来说,当nettytransport的连接过程,就是分别启动client和server,同时将messagechandler。并且只有当节点启动时,client会连接server,获取集群信息,包括之前的5个连接。
    transport除了上述这些功能之外,还负责处理request相关请求。

    transport处理请求

    之前我们聊了transport的启动及连接,当这一切成功之后,transport还会负责处理请求。比如集群中master确认节点是否存在,节点获取集群的状态等。
    为了保证信息传输,es定义了一个19个字节长度的信息头:

    Copy
    HEADER_SIZE = 2 + 4 + 8 + 1 + 4
    

    ES开头,紧接着是4个字节的int类型信息长度,然后是8个字节的long类型的信息id,再是一个字节的status,最后是4个字节int类型的version。所有节点间的信息交互都以这个19个字节的头部开始。同时,es对于节点间的所有action都定义了名字,如对master的周期检测型action。每个action对应着相应的messagehandler:

    Copy
    public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
            //参数说明:node发送的目的节点,requestId请求id,action action名称,request请求,options包括以下几种操作 RECOVERY,BULK,REG,STATE,PING;
         Channel targetChannel = nodeChannel(node, options);//获取对应节点的channel,channel在连接节点时初始化完成(请参考上一篇)
    
            if (compress) {
                options.withCompress(true);
            }
    
            byte status = 0;
         //设置status 包括以下几种STATUS_REQRES = 1 << 0; STATUS_ERROR = 1 << 1; STATUS_COMPRESS = 1 << 2;
        status = TransportStatus.setRequest(status); 
         ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);//初始写出流
            boolean addedReleaseListener = false;
            try {
                bStream.skip(NettyHeader.HEADER_SIZE);//留出message header的位置
                StreamOutput stream = bStream;
                // only compress if asked, and, the request is not bytes, since then only
                // the header part is compressed, and the "body" can't be extracted as compressed
                if (options.compress() && (!(request instanceof BytesTransportRequest))) {
                    status = TransportStatus.setCompress(status);
                    stream = CompressorFactory.defaultCompressor().streamOutput(stream);
                }
                stream = new HandlesStreamOutput(stream);
    
                // we pick the smallest of the 2, to support both backward and forward compatibility
                // note, this is the only place we need to do this, since from here on, we use the serialized version
                // as the version to use also when the node receiving this request will send the response with
                Version version = Version.smallest(this.version, node.version());
    
                stream.setVersion(version);
                stream.writeString(transportServiceAdapter.action(action, version));
    
                ReleasableBytesReference bytes;
                ChannelBuffer buffer;
                // it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
                // that create paged channel buffers, but its tricky to know when to do it (where this option is
                // more explicit).
                if (request instanceof BytesTransportRequest) {
                    BytesTransportRequest bRequest = (BytesTransportRequest) request;
                    assert node.version().equals(bRequest.version());
                    bRequest.writeThin(stream);
                    stream.close();
                    bytes = bStream.bytes();
                    ChannelBuffer headerBuffer = bytes.toChannelBuffer();
                    ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
                    buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);
                } else {
                    request.writeTo(stream);
                    stream.close();
                    bytes = bStream.bytes();
                    buffer = bytes.toChannelBuffer();
                }
                NettyHeader.writeHeader(buffer, requestId, status, version);//写信息头
                ChannelFuture future = targetChannel.write(buffer);//写buffer同时获取future,发送信息发生在这里
                ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
                future.addListener(listener);//添加listener
                addedReleaseListener = true;
                transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
            } finally {
                if (!addedReleaseListener) {
                    Releasables.close(bStream.bytes());
                }
            }
        }
    

    上例展示了request的发送过程,获取目标node的channel封装请求写入信息头,然后发送并使用listener监听,这里的transportRequest是一个抽象类,继承了transportMessage并同时实现了streamable接口,各个功能都有相应的request。
    有发就有收,transport仍将接收交给message处理,而message则转交给messageHandler处理,因为nettytransport的信息处理逻辑都在messageCHannelHandler的messageReceived方法中:

    Copy
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            Transports.assertTransportThread();
            Object m = e.getMessage();
            if (!(m instanceof ChannelBuffer)) {//非buffer之间返回
                ctx.sendUpstream(e);
                return;
            }
         //解析message头
            ChannelBuffer buffer = (ChannelBuffer) m;
            int size = buffer.getInt(buffer.readerIndex() - 4);
            transportServiceAdapter.received(size + 6);
    
            // we have additional bytes to read, outside of the header
            boolean hasMessageBytesToRead = (size - (NettyHeader.HEADER_SIZE - 6)) != 0;
    
            int markedReaderIndex = buffer.readerIndex();
            int expectedIndexReader = markedReaderIndex + size;
    
            // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
            // buffer, or in the cumlation buffer, which is cleaned each time
            StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
          //读取信息头中的几个重要元数据
            long requestId = buffer.readLong();
            byte status = buffer.readByte();
            Version version = Version.fromId(buffer.readInt());
    
            StreamInput wrappedStream;
          …………
            if (TransportStatus.isRequest(status)) {//处理请求
                String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
                if (buffer.readerIndex() != expectedIndexReader) {
                    if (buffer.readerIndex() < expectedIndexReader) {
                        logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
                    } else {
                        logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action);
                    }
                    buffer.readerIndex(expectedIndexReader);
                }
            } else {//处理响应
                TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
                // ignore if its null, the adapter logs it
                if (handler != null) {
                    if (TransportStatus.isError(status)) {
                        handlerResponseError(wrappedStream, handler);
                    } else {
                        handleResponse(ctx.getChannel(), wrappedStream, handler);
                    }
                } else {
                    // if its null, skip those bytes
                    buffer.readerIndex(markedReaderIndex + size);
                }
              …………
            wrappedStream.close();
        }
    

    上述代码展示了信息处理逻辑。
    那么,request和response是如何被处理的呢?先看request的处理代码:

    Copy
    protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
            final String action = buffer.readString();//读出action的名字
            transportServiceAdapter.onRequestReceived(requestId, action);
            final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);
            try {
                final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);//获取处理该信息的handler
                if (handler == null) {
                    throw new ActionNotFoundTransportException(action);
                }
                final TransportRequest request = handler.newInstance();
                request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
                request.readFrom(buffer);
                if (handler.executor() == ThreadPool.Names.SAME) {
                    //noinspection unchecked
                    handler.messageReceived(request, transportChannel);//使用该handler处理信息。
                } else {
                    threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
                }
            } catch (Throwable e) {
                try {
                    transportChannel.sendResponse(e);
                } catch (IOException e1) {
                    logger.warn("Failed to send error message back to client for action [" + action + "]", e);
                    logger.warn("Actual Exception", e1);
                }
            }
            return action;
        }
    

    上例虽然在关键部分已经加了标注,但是仍不能看到请求是如何处理的,因为集群中存在各种请求,比如ping、discovery等等。因此要对应多种处理方式,所以,request最终被提交给handler处理。
    每个功能都有自己的handler,当request被提交handler时,会自动的调用相应的方法来处理。
    request的完整处理流程是:messageRecevied方法收到信息判断时,将request转发给transportServiceApdater的handler方法。handler根据请求类型分发给对应的方法处理。
    那response的处理流程是什么呢?response通过handlerResponse方法处理:

    Copy
    protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
            final TransportResponse response = handler.newInstance();
            response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
            response.remoteAddress();
            try {
                response.readFrom(buffer);
            } catch (Throwable e) {
                handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
                return;
            }
            try {
                if (handler.executor() == ThreadPool.Names.SAME) {
                    //noinspection unchecked
                    handler.handleResponse(response);//转发给对应的handler
                } else {
                    threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
                }
            } catch (Throwable e) {
                handleException(handler, new ResponseHandlerFailureTransportException(e));
            }
        }
    

    response的处理流程跟request类似。response也有对应的handler处理响应request。
    最后,来总结一下nettytransport的信息处理流程:

    • 信息通过request方法发送到目标节点。
    • 目标节点的messageHandler收到该信息,确定是request还是response,然后将它们转发给transportServicedAdapter,transportServicedAdapter根据request或response类型交给对应的handler处理并反馈。

    cluster discovery概述

    es的cluster实现了自己的发现(discovery)机制Zen,discovery的功能包括:

    • mater选举。
    • master错误探测。
    • cluster中节点探测。
    • 单播广播的ping。

    discovery是可配置模块,除了默认机制Zen,还有支持其他机制包括:

    • Azure classic discovery 插件方式,广播。
    • EC2 discovery 插件方式,广播。
    • Google Compute Engine (GCE) discovery 插件方式,广播。
    • Zen discovery 默认实现,广播/单播。

    我们可以根据各插件规则配置自己的发现机制。该机制通过实现guicediscoveryModule对外提供注册和启动。发现模块对外提供接口discoveryService。它本质上是一个discovery的一个代理,所有的功能最终都由所绑定的discovery实现。
    当节点启动时通过discoveryModule获取discoveryService并启动它。这也是其他机制的实现方式。通过discoveryModule对外提供绑定和获取,通过discoveryService接口对外提供功能。

    节点探测:discovery faultdetection

    在es的设计中,一个集群必须有一个主节点(master node)。用来处理请求、索引的创建、修改、节点管理等。
    当有了master节点,该节点就要对各子节点进行周期性(心跳机制)的探测,保证整个集群的健康。
    主节点和各节点之间都会进行心跳检测,比如mater要确保各节点健康状况、是否宕机等,而子节点也要要确保master的健康状况,一旦master宕机,各子节点要重新选举新的master。这种相互间的心跳检测就是cluster的faultdetection。下图展示了faultdetection继承关系。

    faultdetection有两种实现方式,分别是master探测其他节点和其他节点对master的探测。faultdetection抽象了方法handleTransportDisconnect,该方法在内部类FDConnectionListener 中被调用。es中大量使用了listener的异步方式,因此可以大大的提升系统性能:

    Copy
    private class FDConnectionListener implements TransportConnectionListener {
            @Override
            public void onNodeConnected(DiscoveryNode node) {
            }
    
            @Override
            public void onNodeDisconnected(DiscoveryNode node) {
                handleTransportDisconnect(node);
            }
        }
    

    faultdetection启动时就会注册相应的FDConnectionListener,在周期性检测时,发现有节点失联,会通过onNodeDisconnected方法回调handleTransportDisconnect进行处理。先来看masterFaultdetection的启动代码:

    Copy
    private void innerStart(final DiscoveryNode masterNode) {
        this.masterNode = masterNode;
                this.retryCount = 0;
                this.notifiedMasterFailure.set(false);
    
                // 尝试连接master节点
                try {
                    transportService.connectToNode(masterNode);
                } catch (final Exception e) {
                    // 连接失败通知masterNode失败
    
                    notifyMasterFailure(masterNode, "failed to perform initial connect [" + e.getMessage() + "]");
                    return;
                }
            //关闭之前的masterping,重启新的masterping
                if (masterPinger != null) {
                    masterPinger.stop();
                }
                this.masterPinger = new MasterPinger();
    
                // 周期之后启动masterPing,这里并没有周期启动masterPing,只是设定了延迟时间。
                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
            }
    

    再来看master连接失败的处理逻辑:

    Copy
    private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) {
            if (notifiedMasterFailure.compareAndSet(false, true)) {
                threadPool.generic().execute(new Runnable() {
                    @Override
                    public void run() {
                //通知所有listener master丢失
                        for (Listener listener : listeners) {
                            listener.onMasterFailure(masterNode, reason);
                        }
                    }
                });
                stop("master failure, " + reason);
            }
        }
    

    zen discovery机制实现了listener.onMasterFailure接口,处理master失联的相关问题。下面是部分示例代码:

    Copy
    private class MasterPinger implements Runnable {
    
            private volatile boolean running = true;
    
            public void stop() {
                this.running = false;
            }
    
            @Override
            public void run() {
                if (!running) {
                    // return and don't spawn...
                    return;
                }
                final DiscoveryNode masterToPing = masterNode;
       final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);
                final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
                transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {
    
                            @Override
                            public MasterPingResponseResponse newInstance() {
                                return new MasterPingResponseResponse();
                            }
    
                            @Override
                            public void handleResponse(MasterPingResponseResponse response) {
                                if (!running) {
                                    return;
                                }
                                // reset the counter, we got a good result
                                MasterFaultDetection.this.retryCount = 0;
                                // check if the master node did not get switched on us..., if it did, we simply return with no reschedule
                                if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                    // 启动新的ping周期
                                    threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
                                }
                            }
    
                            @Override
                            public void handleException(TransportException exp) {
                                if (!running) {
                                    return;
                                }
                                synchronized (masterNodeMutex) {
                                    // check if the master node did not get switched on us...
                                    if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                        if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                                            handleTransportDisconnect(masterToPing);
                                            return;
                                        } else if (exp.getCause() instanceof NoLongerMasterException) {
                                            logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
                                            notifyMasterFailure(masterToPing, "no longer master");
                                            return;
                                        } else if (exp.getCause() instanceof NotMasterException) {
                                            logger.debug("[master] pinging a master {} that is not the master", masterNode);
                                            notifyMasterFailure(masterToPing, "not master");
                                            return;
                                        } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
                                            logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masterNode);
    
                                            notifyMasterFailure(masterToPing, "do not exists on master, act as master failure");
                                            return;
                                        }
    
                                        int retryCount = ++MasterFaultDetection.this.retryCount;
                                        logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount);
                                        if (retryCount >= pingRetryCount) {
                                            logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", masterNode, pingRetryCount, pingRetryTimeout);
                                            // not good, failure
                                            notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with  maximum [" + pingRetryTimeout + "] timeout");
                                        } else {
                                             // resend the request, not reschedule, rely on send timeout
                                            transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
                                        }
                                    }
                                }
                            }
    
                );
            }
        }
    

    masterPing是一个线程,在innerStart的方法中没有设定周期启动masterPing,但是由于masterPing需要进行心跳检测,这个问题就交给了上例的run方法。如果ping成功就会重启一个新的ping,这样既保证了ping线程的唯一性同时也保证了ping的顺序和间隔。ping的方式同样是通过transport发送一个masterPingRequest进行连接,节点收到该请求后,如果该节点已不再是master就会抛出一个NotMasterException。否则会响应notifyMasterFailure方法。对于网络问题导致的无响应情况,会调用handleTransportDisconnect(masterToPing)方法处理:

    Copy
    protected void handleTransportDisconnect(DiscoveryNode node) {
        //这里需要同步
            synchronized (masterNodeMutex) {
            //master 已经换成其它节点,就没必要再连接
                if (!node.equals(this.masterNode)) {
                    return;
                }
                if (connectOnNetworkDisconnect) {
                    try {
                //尝试再次连接
                        transportService.connectToNode(node);
                        // if all is well, make sure we restart the pinger
                        if (masterPinger != null) {
                            masterPinger.stop();
                        }
                //连接成功启动新的masterping
                        this.masterPinger = new MasterPinger();
                        // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
                        threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger);
                    } catch (Exception e) {
                //连接出现异常,启动master节点丢失通知
                        logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
                        notifyMasterFailure(masterNode, "transport disconnected (with verified connect)");
                    }
                } else {
              //不需要重连,通知master丢失。
                    logger.trace("[master] [{}] transport disconnected", node);
                    notifyMasterFailure(node, "transport disconnected");
                }
            }
        }
    

    就是masterfaultDetection的整个流程:
    启动中如果master节点失联则通知节点丢失,否则在一定延迟(3s)后启动masterPingmasterPing线程尝试连接master节点,如果master节点仍然失联,则再次尝试连接。master节点收到masterPingRequest请求后首先看一下自己还是不是master,如果不是则抛出异常,否则正常回应。节点如果收到响应式异常则启动master丢失通知,否则此次ping结束。在一定时间后重新启动新的masterPing线程。
    这里只是说master的faultdetection,而node的faultdetection跟master逻辑相似。区别主要在于ping异常处理上。
    在node的faultdetection中,当某个node出现异常或者没有响应,会启动node丢失机制,只是具体的处理逻辑不同。

    discovery ping机制

    ping是es中集群发现的基本手段,通过在局域网中广播或者指定ping的某些节点(单播)获取集群信息和节点加入集群等操作。ZenDiscovery机制实现了两种ping机制:

    • 广播,当es实例启动的时候,它发送了广播的ping请求到地址224.2.2.4:54328。而其他的es实例使用同样的集群名称响应了这个请求。
    • 单播,各节点通过单播列表来发现彼此从而加入同一个集群。

    广播的原理很简单,当一个节点启动后向局域网发送广播信息,任何收到节点只要集群名称和该节点相同,就会对此广播作出回应。这样这个节点就能获取集群相关的信息。它定义了一个action:discovery/zen/multicast和广播的信息头INTERNAL_HEADER
    在之前说过,nettyTransport是cluster的通信基础,但是广播却没有使用 ,而是采用了java的multicastsocket。而multicastsocket是一个UDP的socket,用来进行多个数据包的广播。它将节点间的通信组成一个。任何multicastsocket都可以加入进来,组内的socket发送信息会被组内其他的节点接收。es将其进一步封装成multicastchannel
    mutlicastZenPing共定义了4个内部类,共同完成广播功能:

    • finalizingPingCollection是一个pingresponse的容器,用来存储所有的响应。
    • multicastPingResponseRequestHander是response处理类,类似于之前说的nettytransporthandler,这里虽然没有使用netty,但是也定义了一个messageReceived方法,当收到一个请求时直接返回一个response。
    • multicastPingResponse是一个响应类。
    • Received类处理消息逻辑,也是最重要的一个类。

    刚才说了,因为广播没有使用nettytransport,所以对于消息的逻辑处理都在received类中。在初始化的时候,multicastZenPing时会将received注册进去:

    Copy
    protected void doStart() throws ElasticsearchException {
            try {
                ....
                multicastChannel = MulticastChannel.getChannel(nodeName(), shared,
                        new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)),
                        new Receiver());//将receiver注册到channel中
            } catch (Throwable t) {
              ....
            }
        }
    

    received类继承了listener,实现了3个方法,消息经过onMessage方法区分,如果是内部的ping则使用handlerNodePingRequest方法处理,否则使用handlerExternalPingRequest处理。那怎么区分这个消息到底是内部ping还是外部的ping呢?区分方法也很简单,就是读取消息中的关于INTERNAL_HEADER信息头,下面是nodePing的相关代码:

    Copy
    private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {
               ....
                final DiscoveryNodes discoveryNodes = contextProvider.nodes();
                final DiscoveryNode requestingNode = requestingNodeX;
                if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
                    // 自身发出的ping,忽略
                    return;
                }
            //只接受本集群ping
                if (!requestClusterName.equals(clusterName)) {
                ...return;
                }
                // 两个client间不需要ping
                if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {return;
                }
            //新建一个response
                final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
                multicastPingResponse.id = id;
                multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
            //无法连接的情况
                if (!transportService.nodeConnected(requestingNode)) {
                    // do the connect and send on a thread pool
                    threadPool.generic().execute(new Runnable() {
                        @Override
                        public void run() {
                            // connect to the node if possible
                            try {
                                transportService.connectToNode(requestingNode);
                                transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                                    @Override
                                    public void handleException(TransportException exp) {
                                        logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                                    }
                                });
                            } catch (Exception e) {
                                if (lifecycle.started()) {
                                    logger.warn("failed to connect to requesting node {}", e, requestingNode);
                                }
                            }
                        }
                    });
                } else {
                    transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                        @Override
                        public void handleException(TransportException exp) {
                            if (lifecycle.started()) {
                                logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                            }
                        }
                    });
                }
            }
        }
    

    上述代码描述了如何处理内部ping,接下来再说说如何处理来自外部的ping信息。当收到其他节点的响应信息后,它会把本节点及集群的master节点相关信息返回广播节点。这样广播节点就获知了集群信息。
    multicastZenPing类中还有一个类multicastPingResponseRequestHandler,它的作用是广播节点对于其他节点广播信息响应的回应。广播节点的第二次发送信息的过程,它跟其他transportRequestHandler一样有messageReceived方法。在启动时注册到transportserver中,只处理一类actioninternal:discovery/zen/multicast
    我们再来看ping请求的发送策略代码:

    Copy
    public void ping(final PingListener listener, final TimeValue timeout) {
           ....
        
        //产生一个id
            final int id = pingIdGenerator.incrementAndGet();
            try {
                receivedResponses.put(id, new PingCollection());
                sendPingRequest(id);//第一次发送ping请求
                // 等待时间的1/2后再次发送一个请求
                threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                    @Override
                    public void onFailure(Throwable t) {
                        logger.warn("[{}] failed to send second ping request", t, id);
                        finalizePingCycle(id, listener);
                    }
    
                    @Override
                    public void doRun() {
                        sendPingRequest(id);
                //再过1/2时间再次发送一个请求
                        threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                            @Override
                            public void onFailure(Throwable t) {
                                logger.warn("[{}] failed to send third ping request", t, id);
                                finalizePingCycle(id, listener);
                            }
    
                            @Override
                            public void doRun() {
                                // make one last ping, but finalize as soon as all nodes have responded or a timeout has past
                                PingCollection collection = receivedResponses.get(id);
                                FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
                                receivedResponses.put(id, finalizingPingCollection);
                                logger.trace("[{}] sending last pings", id);
                                sendPingRequest(id);
                    //最后一次发送请求,超时的1/4后
                                threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                                    @Override
                                    public void onFailure(Throwable t) {
                                        logger.warn("[{}] failed to finalize ping", t, id);
                                    }
    
                                    @Override
                                    protected void doRun() throws Exception {
                                        finalizePingCycle(id, listener);
                                    }
                                });
                            }
                        });
                    }
                });
            } catch (Exception e) {
                logger.warn("failed to ping", e);
                finalizePingCycle(id, listener);
            }
        }
    

    ping的过程主要是调用sendPingRequest(id)方法,在该方法中将id、版本、本地节点信息一起写入到BytesStreamOutput中,然后将其进行广播。这个广播信息会被其他机器上的Received接收并处理,并且响应ping请求。另外一个需要关注的是上述代码中注释的部分。它通过链式定期发送请求,在等待的时间内可能会发送4次请求,但这也带来了一些问题,这种发送方式会造成大量的ping请求重复,但幸运的是ping请求资源消耗较小。并且带来的好处也显而易见,因为这样尽可能的保证了在timeout的时间段内,集群新增节点都能收到这个ping信息,这种方式应用于单播发现中。
    简要来说,广播使用了java的multicastsocket,在timeout时间内发送4次ping请求,该请求包括一个id、信息头、本地节点信息。这些信息在被其他响应节点接收,交给Received处理,Received会将集群的master和本节点的相关信息通过transport返回给广播节点。广播节点收到这些信息后会立即使用transport返回一个空的response。至此一个广播过程完成。
    广播虽好,但我选择单播!因为当节点在分布在多个网段时,广播模式就失效了,因为广播信息不可达!这个时候就要使用单播去向指定的节点发送ping请求获取cluster的相关信息。这就是单播的用处与优点。
    单播使用的是nettytransport,它会使用跟广播一样,通过链式请求向指定的节点发送请求,信息的处理方式是nettytransport标准的信息处理过程。


    欢迎斧正,that's all 本文主要参考:[elasticsearch节点间通信的基础transport](https://www.cnblogs.com/zziawanblog/p/6523706.html) | [elasticsearch transport 请求发送和处理](https://www.cnblogs.com/zziawanblog/p/6528616.html) | [cluster discovery概述及FaultDetection分析](https://www.cnblogs.com/zziawanblog/p/6533731.html) | [zendiscovery 的Ping机制](https://www.cnblogs.com/zziawanblog/p/6551549.html)
  • 相关阅读:
    温故而知新-错误和异常处理
    温故而知新-面向对象的PHP
    Django框架之模板语法【转载】
    django2.0实现数据详情页展示的流程
    django2.0表的ORM字段类型和展示
    Fatal error: Cannot use object of type PHPExcel_RichText as array
    django2.0数据展示流程
    django2.0模板相关设置
    django2.0新增功能流程
    django2.0设置默认访问路由
  • 原文地址:https://www.cnblogs.com/bubu99/p/13599697.html
Copyright © 2011-2022 走看看