es的集群通信机制transport
transport通信基础
transport是集群间通信的基础,它有两种实现:
- localtransport,主要用于jvm中的节点通信,因为在同一个jvm上的网络模拟,localtransport的实现也相对简单,但实际用处在es中有限。
- nettytransport,一种基于netty实现的transport,同样用于节点间的通信。
我们这里以nettytransport来展开。
transport是集群通信的基本通道,无论是集群的状态信息,还是索引请求信息,都由transport传送。es定义了包括transport接口在内的所有基础接口,NettyTransport也实现了该接口。
来简要的说下Netty的用法,Netty的使用依赖三个模块:
- ServerBootStrap,启动服务。
- ClientBootStrap,启动客户端并建立于服务端的连接。
- MessageHandler,负责主要的业务逻辑。
NettyTransport在doStart方法中调用ServerBootStrap和ClientBootStrap并绑定ip:
protected void doStart() throws ElasticsearchException {
clientBootstrap = createClientBootstrap();//根据配置启动客户端
//省略了无关分代码
createServerBootstrap(name, mergedSettings);//启动server端
bindServerBootstrap(name, mergedSettings);//绑定ip
}
bindServerBootstrap将本地ip绑定到netty同时设定好export host。然后启动client和server的过程将mergedSettings注入到channelpipeline中,至此启动过程结束,但需要注意的是,现在client端并未连接server端,这个连接过程是在节点启动后才进行连接。
public void connectToNode(DiscoveryNode node, boolean light) {
//transport的模块必须要启动
if (!lifecycle.started()) {
throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
}
//获取读锁,每个节点可以和多个节点建立连接,因此这里用读锁
globalLock.readLock().lock();
try {
//以node.id为基础获取一个锁,这保证对于每个node只能建立一次连接
connectionLock.acquire(node.id());
try {
if (!lifecycle.started()) {
throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");
}
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
try {
if (light) {//这里的light,就是对该节点只获取一个channel,所有类型(5种连接类型下面会说到)都使用者一个channel
nodeChannels = connectToChannelsLight(node);
} else {
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);
try {
connectToChannels(nodeChannels, node);
} catch (Throwable e) {
logger.trace("failed to connect to [{}], cleaning dangling connections", e, node);
nodeChannels.close();
throw e;
}
}
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, nodeChannels);
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
}
transportServiceAdapter.raiseNodeConnected(node);
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
throw new ConnectTransportException(node, "general node connection failure", e);
}
} finally {
connectionLock.release(node.id());
}
} finally {
globalLock.readLock().unlock();
}
}
在每个server和client之间都有5个连接,每个连接承担着不同的任务:
protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
//五种连接方式,不同的连接方式对应不同的集群操作
ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length];
ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length];
ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length];
ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length];
ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length];
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
//尝试建立连接
for (int i = 0; i < connectRecovery.length; i++) {
connectRecovery[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectBulk.length; i++) {
connectBulk[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectReg.length; i++) {
connectReg[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectState.length; i++) {
connectState[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectPing.length; i++) {
connectPing[i] = clientBootstrap.connect(address);
}
//获取每个连接的channel存入到相应的channels中便于后面使用。
try {
for (int i = 0; i < connectRecovery.length; i++) {
connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectRecovery[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause());
}
nodeChannels.recovery[i] = connectRecovery[i].getChannel();
nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
for (int i = 0; i < connectBulk.length; i++) {
connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectBulk[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause());
}
nodeChannels.bulk[i] = connectBulk[i].getChannel();
nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
for (int i = 0; i < connectReg.length; i++) {
connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectReg[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause());
}
nodeChannels.reg[i] = connectReg[i].getChannel();
nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
for (int i = 0; i < connectState.length; i++) {
connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectState[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause());
}
nodeChannels.state[i] = connectState[i].getChannel();
nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
for (int i = 0; i < connectPing.length; i++) {
connectPing[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectPing[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectPing[i].getCause());
}
nodeChannels.ping[i] = connectPing[i].getChannel();
nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
if (nodeChannels.recovery.length == 0) {
if (nodeChannels.bulk.length > 0) {
nodeChannels.recovery = nodeChannels.bulk;
} else {
nodeChannels.recovery = nodeChannels.reg;
}
}
if (nodeChannels.bulk.length == 0) {
nodeChannels.bulk = nodeChannels.reg;
}
} catch (RuntimeException e) {
// clean the futures
for (ChannelFuture future : ImmutableList.<ChannelFuture>builder().add(connectRecovery).add(connectBulk).add(connectReg).add(connectState).add(connectPing).build()) {
future.cancel();
if (future.getChannel() != null && future.getChannel().isOpen()) {
try {
future.getChannel().close();
} catch (Exception e1) {
// ignore
}
}
}
throw e;
}
}
上例为节点建立连接的过程,每一对client和server间都会建立一定数量的不同连接,为什么要有这些不同的连接呢?是因为不同的操作消耗的资源也不同,请求的频率也不相同,对于资源消耗少、请求频率高的ping请求,可以多建立一些连接,来确保并发,对于资源消耗大的操作如bulk操作,则要少建立一些连接,防止机器负载过大可能导致节点失联。
总的来说,当nettytransport的连接过程,就是分别启动client和server,同时将messagechandler。并且只有当节点启动时,client会连接server,获取集群信息,包括之前的5个连接。
transport除了上述这些功能之外,还负责处理request相关请求。
transport处理请求
之前我们聊了transport的启动及连接,当这一切成功之后,transport还会负责处理请求。比如集群中master确认节点是否存在,节点获取集群的状态等。
为了保证信息传输,es定义了一个19个字节长度的信息头:
HEADER_SIZE = 2 + 4 + 8 + 1 + 4
以E
和S
开头,紧接着是4个字节的int类型信息长度,然后是8个字节的long类型的信息id,再是一个字节的status,最后是4个字节int类型的version。所有节点间的信息交互都以这个19个字节的头部开始。同时,es对于节点间的所有action都定义了名字,如对master的周期检测型action。每个action对应着相应的messagehandler:
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
//参数说明:node发送的目的节点,requestId请求id,action action名称,request请求,options包括以下几种操作 RECOVERY,BULK,REG,STATE,PING;
Channel targetChannel = nodeChannel(node, options);//获取对应节点的channel,channel在连接节点时初始化完成(请参考上一篇)
if (compress) {
options.withCompress(true);
}
byte status = 0;
//设置status 包括以下几种STATUS_REQRES = 1 << 0; STATUS_ERROR = 1 << 1; STATUS_COMPRESS = 1 << 2;
status = TransportStatus.setRequest(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);//初始写出流
boolean addedReleaseListener = false;
try {
bStream.skip(NettyHeader.HEADER_SIZE);//留出message header的位置
StreamOutput stream = bStream;
// only compress if asked, and, the request is not bytes, since then only
// the header part is compressed, and the "body" can't be extracted as compressed
if (options.compress() && (!(request instanceof BytesTransportRequest))) {
status = TransportStatus.setCompress(status);
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
}
stream = new HandlesStreamOutput(stream);
// we pick the smallest of the 2, to support both backward and forward compatibility
// note, this is the only place we need to do this, since from here on, we use the serialized version
// as the version to use also when the node receiving this request will send the response with
Version version = Version.smallest(this.version, node.version());
stream.setVersion(version);
stream.writeString(transportServiceAdapter.action(action, version));
ReleasableBytesReference bytes;
ChannelBuffer buffer;
// it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
// that create paged channel buffers, but its tricky to know when to do it (where this option is
// more explicit).
if (request instanceof BytesTransportRequest) {
BytesTransportRequest bRequest = (BytesTransportRequest) request;
assert node.version().equals(bRequest.version());
bRequest.writeThin(stream);
stream.close();
bytes = bStream.bytes();
ChannelBuffer headerBuffer = bytes.toChannelBuffer();
ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);
} else {
request.writeTo(stream);
stream.close();
bytes = bStream.bytes();
buffer = bytes.toChannelBuffer();
}
NettyHeader.writeHeader(buffer, requestId, status, version);//写信息头
ChannelFuture future = targetChannel.write(buffer);//写buffer同时获取future,发送信息发生在这里
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
future.addListener(listener);//添加listener
addedReleaseListener = true;
transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
} finally {
if (!addedReleaseListener) {
Releasables.close(bStream.bytes());
}
}
}
上例展示了request的发送过程,获取目标node的channel封装请求写入信息头,然后发送并使用listener监听,这里的transportRequest是一个抽象类,继承了transportMessage并同时实现了streamable接口,各个功能都有相应的request。
有发就有收,transport仍将接收交给message处理,而message则转交给messageHandler处理,因为nettytransport的信息处理逻辑都在messageCHannelHandler的messageReceived方法中:
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Transports.assertTransportThread();
Object m = e.getMessage();
if (!(m instanceof ChannelBuffer)) {//非buffer之间返回
ctx.sendUpstream(e);
return;
}
//解析message头
ChannelBuffer buffer = (ChannelBuffer) m;
int size = buffer.getInt(buffer.readerIndex() - 4);
transportServiceAdapter.received(size + 6);
// we have additional bytes to read, outside of the header
boolean hasMessageBytesToRead = (size - (NettyHeader.HEADER_SIZE - 6)) != 0;
int markedReaderIndex = buffer.readerIndex();
int expectedIndexReader = markedReaderIndex + size;
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
// buffer, or in the cumlation buffer, which is cleaned each time
StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
//读取信息头中的几个重要元数据
long requestId = buffer.readLong();
byte status = buffer.readByte();
Version version = Version.fromId(buffer.readInt());
StreamInput wrappedStream;
…………
if (TransportStatus.isRequest(status)) {//处理请求
String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
if (buffer.readerIndex() != expectedIndexReader) {
if (buffer.readerIndex() < expectedIndexReader) {
logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
} else {
logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action);
}
buffer.readerIndex(expectedIndexReader);
}
} else {//处理响应
TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
if (TransportStatus.isError(status)) {
handlerResponseError(wrappedStream, handler);
} else {
handleResponse(ctx.getChannel(), wrappedStream, handler);
}
} else {
// if its null, skip those bytes
buffer.readerIndex(markedReaderIndex + size);
}
…………
wrappedStream.close();
}
上述代码展示了信息处理逻辑。
那么,request和response是如何被处理的呢?先看request的处理代码:
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readString();//读出action的名字
transportServiceAdapter.onRequestReceived(requestId, action);
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);//获取处理该信息的handler
if (handler == null) {
throw new ActionNotFoundTransportException(action);
}
final TransportRequest request = handler.newInstance();
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
request.readFrom(buffer);
if (handler.executor() == ThreadPool.Names.SAME) {
//noinspection unchecked
handler.messageReceived(request, transportChannel);//使用该handler处理信息。
} else {
threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
}
} catch (Throwable e) {
try {
transportChannel.sendResponse(e);
} catch (IOException e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e);
logger.warn("Actual Exception", e1);
}
}
return action;
}
上例虽然在关键部分已经加了标注,但是仍不能看到请求是如何处理的,因为集群中存在各种请求,比如ping、discovery等等。因此要对应多种处理方式,所以,request最终被提交给handler处理。
每个功能都有自己的handler,当request被提交handler时,会自动的调用相应的方法来处理。
request的完整处理流程是:messageRecevied方法收到信息判断时,将request转发给transportServiceApdater的handler方法。handler根据请求类型分发给对应的方法处理。
那response的处理流程是什么呢?response通过handlerResponse方法处理:
protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
final TransportResponse response = handler.newInstance();
response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
response.remoteAddress();
try {
response.readFrom(buffer);
} catch (Throwable e) {
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
return;
}
try {
if (handler.executor() == ThreadPool.Names.SAME) {
//noinspection unchecked
handler.handleResponse(response);//转发给对应的handler
} else {
threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
}
} catch (Throwable e) {
handleException(handler, new ResponseHandlerFailureTransportException(e));
}
}
response的处理流程跟request类似。response也有对应的handler处理响应request。
最后,来总结一下nettytransport的信息处理流程:
- 信息通过request方法发送到目标节点。
- 目标节点的messageHandler收到该信息,确定是request还是response,然后将它们转发给transportServicedAdapter,transportServicedAdapter根据request或response类型交给对应的handler处理并反馈。
cluster discovery概述
es的cluster实现了自己的发现(discovery)机制Zen,discovery的功能包括:
- mater选举。
- master错误探测。
- cluster中节点探测。
- 单播广播的ping。
discovery是可配置模块,除了默认机制Zen,还有支持其他机制包括:
- Azure classic discovery 插件方式,广播。
- EC2 discovery 插件方式,广播。
- Google Compute Engine (GCE) discovery 插件方式,广播。
- Zen discovery 默认实现,广播/单播。
我们可以根据各插件规则配置自己的发现机制。该机制通过实现guice
的discoveryModule
对外提供注册和启动。发现模块对外提供接口discoveryService
。它本质上是一个discovery
的一个代理,所有的功能最终都由所绑定的discovery
实现。
当节点启动时通过discoveryModule
获取discoveryService
并启动它。这也是其他机制的实现方式。通过discoveryModule
对外提供绑定和获取,通过discoveryService
接口对外提供功能。
节点探测:discovery faultdetection
在es的设计中,一个集群必须有一个主节点(master node)。用来处理请求、索引的创建、修改、节点管理等。
当有了master节点,该节点就要对各子节点进行周期性(心跳机制)的探测,保证整个集群的健康。
主节点和各节点之间都会进行心跳检测,比如mater要确保各节点健康状况、是否宕机等,而子节点也要要确保master的健康状况,一旦master宕机,各子节点要重新选举新的master。这种相互间的心跳检测就是cluster的faultdetection
。下图展示了faultdetection
继承关系。
faultdetection
有两种实现方式,分别是master探测其他节点和其他节点对master的探测。faultdetection
抽象了方法handleTransportDisconnect
,该方法在内部类FDConnectionListener
中被调用。es中大量使用了listener的异步方式,因此可以大大的提升系统性能:
private class FDConnectionListener implements TransportConnectionListener {
@Override
public void onNodeConnected(DiscoveryNode node) {
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
handleTransportDisconnect(node);
}
}
当faultdetection
启动时就会注册相应的FDConnectionListener
,在周期性检测时,发现有节点失联,会通过onNodeDisconnected
方法回调handleTransportDisconnect
进行处理。先来看masterFaultdetection
的启动代码:
private void innerStart(final DiscoveryNode masterNode) {
this.masterNode = masterNode;
this.retryCount = 0;
this.notifiedMasterFailure.set(false);
// 尝试连接master节点
try {
transportService.connectToNode(masterNode);
} catch (final Exception e) {
// 连接失败通知masterNode失败
notifyMasterFailure(masterNode, "failed to perform initial connect [" + e.getMessage() + "]");
return;
}
//关闭之前的masterping,重启新的masterping
if (masterPinger != null) {
masterPinger.stop();
}
this.masterPinger = new MasterPinger();
// 周期之后启动masterPing,这里并没有周期启动masterPing,只是设定了延迟时间。
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
}
再来看master连接失败的处理逻辑:
private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) {
if (notifiedMasterFailure.compareAndSet(false, true)) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
//通知所有listener master丢失
for (Listener listener : listeners) {
listener.onMasterFailure(masterNode, reason);
}
}
});
stop("master failure, " + reason);
}
}
zen discovery
机制实现了listener.onMasterFailure
接口,处理master失联的相关问题。下面是部分示例代码:
private class MasterPinger implements Runnable {
private volatile boolean running = true;
public void stop() {
this.running = false;
}
@Override
public void run() {
if (!running) {
// return and don't spawn...
return;
}
final DiscoveryNode masterToPing = masterNode;
final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);
final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {
@Override
public MasterPingResponseResponse newInstance() {
return new MasterPingResponseResponse();
}
@Override
public void handleResponse(MasterPingResponseResponse response) {
if (!running) {
return;
}
// reset the counter, we got a good result
MasterFaultDetection.this.retryCount = 0;
// check if the master node did not get switched on us..., if it did, we simply return with no reschedule
if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
// 启动新的ping周期
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
}
}
@Override
public void handleException(TransportException exp) {
if (!running) {
return;
}
synchronized (masterNodeMutex) {
// check if the master node did not get switched on us...
if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
handleTransportDisconnect(masterToPing);
return;
} else if (exp.getCause() instanceof NoLongerMasterException) {
logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
notifyMasterFailure(masterToPing, "no longer master");
return;
} else if (exp.getCause() instanceof NotMasterException) {
logger.debug("[master] pinging a master {} that is not the master", masterNode);
notifyMasterFailure(masterToPing, "not master");
return;
} else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masterNode);
notifyMasterFailure(masterToPing, "do not exists on master, act as master failure");
return;
}
int retryCount = ++MasterFaultDetection.this.retryCount;
logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount);
if (retryCount >= pingRetryCount) {
logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", masterNode, pingRetryCount, pingRetryTimeout);
// not good, failure
notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
} else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
}
}
}
}
);
}
}
masterPing
是一个线程,在innerStart
的方法中没有设定周期启动masterPing
,但是由于masterPing
需要进行心跳检测,这个问题就交给了上例的run
方法。如果ping成功就会重启一个新的ping,这样既保证了ping线程的唯一性同时也保证了ping的顺序和间隔。ping的方式同样是通过transport
发送一个masterPingRequest
进行连接,节点收到该请求后,如果该节点已不再是master就会抛出一个NotMasterException
。否则会响应notifyMasterFailure
方法。对于网络问题导致的无响应情况,会调用handleTransportDisconnect(masterToPing)
方法处理:
protected void handleTransportDisconnect(DiscoveryNode node) {
//这里需要同步
synchronized (masterNodeMutex) {
//master 已经换成其它节点,就没必要再连接
if (!node.equals(this.masterNode)) {
return;
}
if (connectOnNetworkDisconnect) {
try {
//尝试再次连接
transportService.connectToNode(node);
// if all is well, make sure we restart the pinger
if (masterPinger != null) {
masterPinger.stop();
}
//连接成功启动新的masterping
this.masterPinger = new MasterPinger();
// we use schedule with a 0 time value to run the pinger on the pool as it will run on later
threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger);
} catch (Exception e) {
//连接出现异常,启动master节点丢失通知
logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
notifyMasterFailure(masterNode, "transport disconnected (with verified connect)");
}
} else {
//不需要重连,通知master丢失。
logger.trace("[master] [{}] transport disconnected", node);
notifyMasterFailure(node, "transport disconnected");
}
}
}
就是masterfaultDetection
的整个流程:
启动中如果master节点失联则通知节点丢失,否则在一定延迟(3s)后启动masterPing
,masterPing
线程尝试连接master节点,如果master节点仍然失联,则再次尝试连接。master节点收到masterPingRequest
请求后首先看一下自己还是不是master,如果不是则抛出异常,否则正常回应。节点如果收到响应式异常则启动master丢失通知,否则此次ping结束。在一定时间后重新启动新的masterPing
线程。
这里只是说master的faultdetection,而node的faultdetection跟master逻辑相似。区别主要在于ping异常处理上。
在node的faultdetection中,当某个node出现异常或者没有响应,会启动node丢失机制,只是具体的处理逻辑不同。
discovery ping机制
ping是es中集群发现的基本手段,通过在局域网中广播或者指定ping的某些节点(单播)获取集群信息和节点加入集群等操作。ZenDiscovery
机制实现了两种ping机制:
- 广播,当es实例启动的时候,它发送了广播的ping请求到地址
224.2.2.4:54328
。而其他的es实例使用同样的集群名称响应了这个请求。 - 单播,各节点通过单播列表来发现彼此从而加入同一个集群。
广播的原理很简单,当一个节点启动后向局域网发送广播信息,任何收到节点只要集群名称和该节点相同,就会对此广播作出回应。这样这个节点就能获取集群相关的信息。它定义了一个action:discovery/zen/multicast
和广播的信息头INTERNAL_HEADER
。
在之前说过,nettyTransport
是cluster的通信基础,但是广播却没有使用 ,而是采用了java的multicastsocket
。而multicastsocket
是一个UDP的socket,用来进行多个数据包的广播。它将节点间的通信组成一个组。任何multicastsocket
都可以加入进来,组内的socket发送信息会被组内其他的节点接收。es将其进一步封装成multicastchannel
。
mutlicastZenPing
共定义了4个内部类,共同完成广播功能:
- finalizingPingCollection是一个pingresponse的容器,用来存储所有的响应。
- multicastPingResponseRequestHander是response处理类,类似于之前说的nettytransporthandler,这里虽然没有使用netty,但是也定义了一个messageReceived方法,当收到一个请求时直接返回一个response。
- multicastPingResponse是一个响应类。
- Received类处理消息逻辑,也是最重要的一个类。
刚才说了,因为广播没有使用nettytransport,所以对于消息的逻辑处理都在received类中。在初始化的时候,multicastZenPing
时会将received注册进去:
protected void doStart() throws ElasticsearchException {
try {
....
multicastChannel = MulticastChannel.getChannel(nodeName(), shared,
new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)),
new Receiver());//将receiver注册到channel中
} catch (Throwable t) {
....
}
}
received
类继承了listener
,实现了3个方法,消息经过onMessage
方法区分,如果是内部的ping则使用handlerNodePingRequest
方法处理,否则使用handlerExternalPingRequest
处理。那怎么区分这个消息到底是内部ping还是外部的ping呢?区分方法也很简单,就是读取消息中的关于INTERNAL_HEADER
信息头,下面是nodePing
的相关代码:
private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {
....
final DiscoveryNodes discoveryNodes = contextProvider.nodes();
final DiscoveryNode requestingNode = requestingNodeX;
if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
// 自身发出的ping,忽略
return;
}
//只接受本集群ping
if (!requestClusterName.equals(clusterName)) {
...return;
}
// 两个client间不需要ping
if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {return;
}
//新建一个response
final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
multicastPingResponse.id = id;
multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
//无法连接的情况
if (!transportService.nodeConnected(requestingNode)) {
// do the connect and send on a thread pool
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
// connect to the node if possible
try {
transportService.connectToNode(requestingNode);
transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
});
} catch (Exception e) {
if (lifecycle.started()) {
logger.warn("failed to connect to requesting node {}", e, requestingNode);
}
}
}
});
} else {
transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
if (lifecycle.started()) {
logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
}
});
}
}
}
上述代码描述了如何处理内部ping,接下来再说说如何处理来自外部的ping信息。当收到其他节点的响应信息后,它会把本节点及集群的master节点相关信息返回广播节点。这样广播节点就获知了集群信息。
在multicastZenPing
类中还有一个类multicastPingResponseRequestHandler
,它的作用是广播节点对于其他节点广播信息响应的回应。广播节点的第二次发送信息的过程,它跟其他transportRequestHandler
一样有messageReceived
方法。在启动时注册到transportserver
中,只处理一类actioninternal:discovery/zen/multicast
。
我们再来看ping请求的发送策略代码:
public void ping(final PingListener listener, final TimeValue timeout) {
....
//产生一个id
final int id = pingIdGenerator.incrementAndGet();
try {
receivedResponses.put(id, new PingCollection());
sendPingRequest(id);//第一次发送ping请求
// 等待时间的1/2后再次发送一个请求
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send second ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
sendPingRequest(id);
//再过1/2时间再次发送一个请求
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to send third ping request", t, id);
finalizePingCycle(id, listener);
}
@Override
public void doRun() {
// make one last ping, but finalize as soon as all nodes have responded or a timeout has past
PingCollection collection = receivedResponses.get(id);
FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
receivedResponses.put(id, finalizingPingCollection);
logger.trace("[{}] sending last pings", id);
sendPingRequest(id);
//最后一次发送请求,超时的1/4后
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.warn("[{}] failed to finalize ping", t, id);
}
@Override
protected void doRun() throws Exception {
finalizePingCycle(id, listener);
}
});
}
});
}
});
} catch (Exception e) {
logger.warn("failed to ping", e);
finalizePingCycle(id, listener);
}
}
ping的过程主要是调用sendPingRequest(id)
方法,在该方法中将id、版本、本地节点信息一起写入到BytesStreamOutput
中,然后将其进行广播。这个广播信息会被其他机器上的Received
接收并处理,并且响应ping请求。另外一个需要关注的是上述代码中注释的部分。它通过链式定期发送请求,在等待的时间内可能会发送4次请求,但这也带来了一些问题,这种发送方式会造成大量的ping请求重复,但幸运的是ping请求资源消耗较小。并且带来的好处也显而易见,因为这样尽可能的保证了在timeout的时间段内,集群新增节点都能收到这个ping信息,这种方式应用于单播发现中。
简要来说,广播使用了java的multicastsocket
,在timeout时间内发送4次ping请求,该请求包括一个id、信息头、本地节点信息。这些信息在被其他响应节点接收,交给Received
处理,Received
会将集群的master和本节点的相关信息通过transport
返回给广播节点。广播节点收到这些信息后会立即使用transport
返回一个空的response。至此一个广播过程完成。
广播虽好,但我选择单播!因为当节点在分布在多个网段时,广播模式就失效了,因为广播信息不可达!这个时候就要使用单播去向指定的节点发送ping请求获取cluster的相关信息。这就是单播的用处与优点。
单播使用的是nettytransport
,它会使用跟广播一样,通过链式请求向指定的节点发送请求,信息的处理方式是nettytransport
标准的信息处理过程。
欢迎斧正,that's all 本文主要参考:[elasticsearch节点间通信的基础transport](https://www.cnblogs.com/zziawanblog/p/6523706.html) | [elasticsearch transport 请求发送和处理](https://www.cnblogs.com/zziawanblog/p/6528616.html) | [cluster discovery概述及FaultDetection分析](https://www.cnblogs.com/zziawanblog/p/6533731.html) | [zendiscovery 的Ping机制](https://www.cnblogs.com/zziawanblog/p/6551549.html)