[从源码学设计]蚂蚁金服SOFARegistry 之 如何与Meta Server交互
0x00 摘要
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第十篇,主要是从业务角度进行梳理。看看DataServer如何与MetaServer交互。
0x01 业务范畴
1.1 MetaServer的重要性
首先我们要复习下MetaServer的重要性。
MetaServer元数据服务器集群。这个集群管辖的范围是 Session 服务器集群和 Data 服务器集群的服务器信息,其角色就相当于 SOFARegistry 架构内部的服务注册中心,只不过 SOFARegistry 作为服务注册中心是服务于广大应用服务层,而 Meta 集群是服务于 SOFARegistry 内部的 Session 集群和 Data 集群,Meta 层能够感知到 Session 节点和 Data 节点的变化,并通知集群的其它节点。
所以,如果想获取节点的变化,DataServer就必须重点研究如何与MetaServer交互。
1.2 推拉模型
居于Bolt协议,DataServer在与Meta Server的交互中,使用了推拉模型。
1.3 分析策略
我们在这里重点分析其设计策略如下:
- 用什么来确保交互的有效性。
- 用什么来解耦。
- 用什么来确保网络交互的效率。
0x02 目录结构
此模块目录结构如下,大致可以推论,
-
DefaultMetaServiceImpl 是 Meta Server 相关模块主体;
-
MetaServerConnectionFactory是连接管理;
-
ConnectionRefreshMetaTask 是定期循环task;
-
handler目录下是三个响应函数;
-
provideData 目录下是配置相关功能;
具体目录结构如下:
│ ├── metaserver
│ │ ├── DefaultMetaServiceImpl.java
│ │ ├── IMetaServerService.java
│ │ ├── MetaServerConnectionFactory.java
│ │ ├── handler
│ │ │ ├── NotifyProvideDataChangeHandler.java
│ │ │ ├── ServerChangeHandler.java
│ │ │ └── StatusConfirmHandler.java
│ │ ├── provideData
│ │ │ ├── ProvideDataProcessor.java
│ │ │ ├── ProvideDataProcessorManager.java
│ │ │ └── processor
│ │ │ └── DatumExpireProvideDataProcessor.java
│ │ └── task
│ │ └── ConnectionRefreshMetaTask.java
0x03 Bean
MetaServer相关组件如下:
- metaServerService,用来与MetaServer进行交互,基于raft和Bolt;
- datumLeaseManager,用来维护具体数据;
0x04 Raft协议
这里有一个问题 :为什么 DataServerBootstrap 之中还有 startRaftClient,按说DataServer只用Http和Bolt就可以了。
原来是用 raft 协议来获取MetaServer集群中leader的地址等信息: raftClient.getLeader();
比如 renewNodeTask 时候会用到。
Raft相关启动是在startRaftClient,此函数的作用是:
- 启动Raft客户端,保证分布式一致性;
- 向 EventCenter 投放MetaServerChangeEvent;
具体代码是:
private void startRaftClient() {
metaServerService.startRaftClient();
eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}
0x05 消息处理
前面提到了,当系统启动之后,会主动发送一个MetaServerChangeEvent,我们就看看其内容。
5.1 MetaServerChangeEvent
public class MetaServerChangeEvent implements Event {
private Map<String, Set<String>> ipMap;
/**
* constructor
* @param ipMap
*/
public MetaServerChangeEvent(Map<String, Set<String>> ipMap) {
this.ipMap = ipMap;
}
public Map<String, Set<String>> getIpMap() {
return ipMap;
}
}
其运行状态如下:
event = {MetaServerChangeEvent@5991}
ipMap = {HashMap@5678} size = 1
"DefaultDataCenter" -> {ConcurrentHashMap$KeySetView@6007} size = 1
5.2 消息来源
MetaServerChangeEvent有三种来源:启动主动获取,定期,推送。这三种具体如下:
- 启动主动获取:这个主动查询并且拉取的过程,这个过程基本上类似一个同步过程,体现为客户端一次查询结果的同步返回。
- 版本变更推送:为了确定服务发布数据的变更,对于这个服务感兴趣的所有客户端订阅方都需要推送,进行推送。由于性能要求必须并发执行并且异步确定推送成功。
- 定期轮训:这样避免了某次变更通知没有通知到所有订阅方的情况。
我们先简述来源:
5.2.1 启动
这就是上面提到的,启动时会从配置里面读取meta server配置,metaServerService.getMetaServerMap();据此构建MetaServerChangeEvent,投放到EventCenter之中。
当 DataServer 节点初始化成功后,会启动任务自动去连接 MetaServer。即,该任务会往事件中心 EventCenter 注册一个 DataServerChangeEvent 事件,该事件注册后会被触发,之后将对新增节点计算 Hash 值,同时进行纳管分片。
具体启动时,会从配置里面读取meta server配置,metaServerService.getMetaServerMap();据此构建MetaServerChangeEvent,投放到EventCenter之中。
private void startRaftClient() {
metaServerService.startRaftClient();
eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}
堆栈如下
register:44, MetaServerConnectionFactory (com.alipay.sofa.registry.server.data.remoting.metaserver)
registerMetaServer:129, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler)
doHandle:92, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler)
doHandle:55, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler)
handle:51, AbstractEventHandler (com.alipay.sofa.registry.server.data.event.handler)
post:56, EventCenter (com.alipay.sofa.registry.server.data.event)
startRaftClient:197, DataServerBootstrap (com.alipay.sofa.registry.server.data.bootstrap)
start:131, DataServerBootstrap (com.alipay.sofa.registry.server.data.bootstrap)
start:47, DataServerInitializer (com.alipay.sofa.registry.server.data.bootstrap)
doStart:173, DefaultLifecycleProcessor (org.springframework.context.support)
access$200:50, DefaultLifecycleProcessor (org.springframework.context.support)
start:350, DefaultLifecycleProcessor$LifecycleGroup (org.springframework.context.support)
startBeans:149, DefaultLifecycleProcessor (org.springframework.context.support)
onRefresh:112, DefaultLifecycleProcessor (org.springframework.context.support)
finishRefresh:880, AbstractApplicationContext (org.springframework.context.support)
refresh:546, AbstractApplicationContext (org.springframework.context.support)
refresh:693, SpringApplication (org.springframework.boot)
refreshContext:360, SpringApplication (org.springframework.boot)
run:303, SpringApplication (org.springframework.boot)
run:1118, SpringApplication (org.springframework.boot)
run:1107, SpringApplication (org.springframework.boot)
main:41, DataApplication (com.alipay.sofa.registry.server.data)
5.2.2 定时
这部分是ConnectionRefreshMetaTask完成。ConnectionRefreshMetaTask 是定期 task,其在 Bean tasks 里面配置。
StartTaskEventHandler 会调用到 tasks,当收到 StartTaskEvent 之后,会启动 tasks里面的几个AbstractTask。
public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> {
@Resource(name = "tasks")
private List<AbstractTask> tasks;
private ScheduledExecutorService executor = null;
@Override
public List<Class<? extends StartTaskEvent>> interest() {
return Lists.newArrayList(StartTaskEvent.class);
}
@Override
public void doHandle(StartTaskEvent event) {
if (executor == null || executor.isShutdown()) {
getExecutor();
}
for (AbstractTask task : tasks) {
if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) {
executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),task.getTimeUnit());
}
}
}
private void getExecutor() {
executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass()
.getSimpleName());
}
}
具体tasks如下:
@Bean(name = "tasks")
public List<AbstractTask> tasks() {
List<AbstractTask> list = new ArrayList<>();
list.add(connectionRefreshTask());
list.add(connectionRefreshMetaTask());
list.add(renewNodeTask());
return list;
}
ConnectionRefreshMetaTask 是定期task,会定期向EventCenter投放一个 MetaServerChangeEvent。
执行时候调用 metaServerService.getMetaServerMap();
返回一个MetaServerChangeEvent,并且添加到EventCenter之中。
public class ConnectionRefreshMetaTask extends AbstractTask {
@Autowired
private IMetaServerService metaServerService;
@Autowired
private EventCenter eventCenter;
@Override
public void handle() {
eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}
}
5.2.3 推送
ServerChangeHandler 是 metaClientHandler 的一部分,是MetaNodeExchanger 的响应函数。
ServerChangeHandler 继承了AbstractClientHandler,在interest之中,配置了会响应NodeChangeResult。
如果Meta有推送,ServerChangeHandler这里就有响应,这个会是 Meta Server 主动通知。
在ServerChangeHandler之中,拿到了NodeChangeResult之后,会判断变更节点类型,这里会根据 Note 类型不同,决定产生 DataServerChangeEvent 还是MetaServerChangeEvent。如果是NodeType.META,就发送消息给eventCenter,eventCenter.post(new MetaServerChangeEvent(map));
,这就是MetaServerChangeEvent的来源之一。
public class ServerChangeHandler extends AbstractClientHandler<NodeChangeResult> {
@Autowired
private EventCenter eventCenter;
@Autowired
private DataServerConfig dataServerConfig;
@Override
public Object doHandle(Channel channel, NodeChangeResult request) {
ExecutorFactory.getCommonExecutor().execute(() -> {
if (request.getNodeType() == NodeType.DATA) {
eventCenter.post(new DataServerChangeEvent(request.getNodes(),
request.getDataCenterListVersions(), FromType.META_NOTIFY));
} else if (request.getNodeType() == NodeType.META) {
Map<String, Map<String, MetaNode>> metaNodesMap = request.getNodes();
if (metaNodesMap != null && !metaNodesMap.isEmpty()) {
Map<String, MetaNode> metaNodeMap = metaNodesMap.get(dataServerConfig.getLocalDataCenter());
if (metaNodeMap != null && !metaNodeMap.isEmpty()) {
HashMap<String, Set<String>> map = new HashMap<>();
map.put(dataServerConfig.getLocalDataCenter(), metaNodeMap.keySet());
eventCenter.post(new MetaServerChangeEvent(map));
}
}
}
});
return CommonResponse.buildSuccessResponse();
}
@Override
public Class interest() {
return NodeChangeResult.class;
}
@Override
public HandlerType getType() {
return HandlerType.PROCESSER;
}
@Override
protected Node.NodeType getConnectNodeType() {
return Node.NodeType.DATA;
}
}
此时逻辑图如下,可以看到三种MetaServerChangeEvent消息来源,ServerChangeHandler也会提供DataServerChangeEvent:
+-------------------------------+
|[DataServerBootstrap] | MetaServerChangeEvent
| |
| +-------------------------+
| startRaftClient | |
| | |
| | |
+-------------------------------+ |
+-------------------------------+ |
| [Timer] | |
| | | +-------------+
| ConnectionRefreshMetaTask +------------------------------> | EventCenter |
| | MetaServerChangeEvent | +-------+-----+
+-------------------------------+ | ^
+-------------------------------+ | |
| [Push<NodeChangeResult>] | | |
| | | |
| +-------------------------+ |
| | MetaServerChangeEvent |
| ServerChangeHandler | |
| +----------------------------------------+
+-------------------------------+ DataServerChangeEvent
0x06 MetaServerChangeEventHandler
MetaServerChangeEventHandler 用来响应 MetaServerChangeEvent 消息。因为其继承了AbstractEventHandler,所以 MetaServerChangeEventHandler 已经注册到了EventCenter之上。
注意,这里有一个再次转换DataServerChangeEvent的过程,即这里又会主动和MetaServer交互,如果返回消息是NodeChangeResult,就转换为DataServerChangeEvent。
这是因为Meta Server的这个推送,也许是告诉data Server,"hi,目前data server也有变动,兄弟你再来拉取下"。
在处理时候,MetaServerChangeEventHandler会去与MetaServer交互,看看其有效性,如果有效,就注册。
逻辑如下:
- 在MetaServerChangeEventHandler之中,会遍历MetaServerChangeEvent之中的 dataCenter, ip进行注册,registerMetaServer(dataCenter, ip); 在registerMetaServer之中:
- 获取 meta server的 leader;
- 使用 metaNodeExchanger.connect 对 IP,getMetaServerPort 进行连接;
- 得到Channel之后,注册到 metaServerConnectionFactory 之中;
- 如果 ip不是meta leader,则再次调用metaNodeExchanger注册自己
DataNode(new URL(DataServerConfig.IP), dataServerConfig .getLocalDataCenter());
- 注册成功之后,则给EventCenter发送 DataServerChangeEvent,内部继续处理 ;
具体代码如下:
public class MetaServerChangeEventHandler extends AbstractEventHandler<MetaServerChangeEvent> {
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private IMetaServerService metaServerService;
@Autowired
private MetaNodeExchanger metaNodeExchanger;
@Autowired
private EventCenter eventCenter;
@Autowired
private MetaServerConnectionFactory metaServerConnectionFactory;
@Override
public List<Class<? extends MetaServerChangeEvent>> interest() {
return Lists.newArrayList(MetaServerChangeEvent.class);
}
@Override
public void doHandle(MetaServerChangeEvent event) {
Map<String, Set<String>> ipMap = event.getIpMap();
for (Entry<String, Set<String>> ipEntry : ipMap.entrySet()) {
String dataCenter = ipEntry.getKey();
Set<String> ips = ipEntry.getValue();
if (!CollectionUtils.isEmpty(ips)) {
for (String ip : ips) {
Connection connection = metaServerConnectionFactory.getConnection(dataCenter,
ip);
if (connection == null || !connection.isFine()) {
registerMetaServer(dataCenter, ip);
}
}
Set<String> ipSet = metaServerConnectionFactory.getIps(dataCenter);
for (String ip : ipSet) {
if (!ips.contains(ip)) {
metaServerConnectionFactory.remove(dataCenter, ip);
}
}
} else {
//remove connections of dataCenter if the connectionMap of the dataCenter in ipMap is empty
removeDataCenter(dataCenter);
}
}
//remove connections of dataCenter if the dataCenter not exist in ipMap
Set<String> dataCenters = metaServerConnectionFactory.getAllDataCenters();
for (String dataCenter : dataCenters) {
if (!ipMap.containsKey(dataCenter)) {
removeDataCenter(dataCenter);
}
}
}
private void registerMetaServer(String dataCenter, String ip) {
PeerId leader = metaServerService.getLeader();
for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) {
try {
Channel channel = metaNodeExchanger.connect(new URL(ip, dataServerConfig
.getMetaServerPort()));
//connect all meta server
if (channel != null && channel.isConnected()) {
metaServerConnectionFactory.register(dataCenter, ip,
((BoltChannel) channel).getConnection());
}
//register leader meta node
if (ip.equals(leader.getIp())) {
Object obj = null;
try {
obj = metaNodeExchanger.request(new Request() {
@Override
public Object getRequestBody() {
return new DataNode(new URL(DataServerConfig.IP), dataServerConfig
.getLocalDataCenter());
}
@Override
public URL getRequestUrl() {
return new URL(ip, dataServerConfig.getMetaServerPort());
}
}).getResult();
}
if (obj instanceof NodeChangeResult) {
NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
Map<String, Long> versionMap = result.getDataCenterListVersions();
//send renew after first register dataNode
Set<StartTaskTypeEnum> set = new HashSet<>();
set.add(StartTaskTypeEnum.RENEW);
eventCenter.post(new StartTaskEvent(set));
eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap,DataServerChangeEvent.FromType.REGISTER_META));
break;
}
}
}
}
}
此时逻辑图如下:
+-------------------------------+
|[DataServerBootstrap] | MetaServerChangeEvent
| |
| +-------------------------+
| startRaftClient | |
| | | +---------------+
| | | | |
+-------------------------------+ | | |
+-------------------------------+ | | |
| [Timer] | | v |
| | | 1 +-------+-----+ |
| ConnectionRefreshMetaTask +------------------------------> | EventCenter +----+ |
| | MetaServerChangeEvent | +-------+-----+ | |
+-------------------------------+ | ^ | |
+-------------------------------+ | | | |
| | | | | |
| [Push<NodeChangeResult>] | | | | |
| | | | | |
| +-------------------------+ | | |
| | MetaServerChangeEvent | | |
| ServerChangeHandler | 2 | | |
| +----------------------------------------+ | |
+-------------------------------+ DataServerChangeEvent | |
| |
| |
MetaServerChangeEvent | |
3 | |
+----------------------------------------------------+ |
| |
v |
+-----------------+--------------+ DataServerChangeEvent |
| | 4 |
| MetaServerChangeEventHandler +------------------------------------------+
| |
+--------------------------------+
手机如下:
6.1 连接管理
下面我们讲讲dataServer如何管理metaServer的连接。
我们知道,一次 tcp 请求大致分为三个步骤:建立连接、通信、关闭连接。每次建立新连接都会经历三次握手,中间包含三次网络传输,对于高并发的系统,这是一笔不小的负担;关闭连接同样如此。为了减少每次网络调用请求的开销,对连接进行管理、复用,可以极大的提高系统的性能。
为了提高通信效率,我们需要考虑复用连接,减少 TCP 三次握手的次数,因此需要有连接管理的机制。
关于连接管理,SOFARegistry有两个层次的连接管理,分别是 Connection 和 Node。
6.1.1 Connection管理
可以用socket(localIp,localPort, remoteIp,remotePort )代表一个连接,在Netty中用Channel来表示,在sofa-bolt使用Connection对象来抽象一个连接,一个连接在client跟server端各用一个connection对象表示。
有了Connection这个抽象之后,自然的需要提供接口来管理Connection, 这个接口就是ConnectionFactory。
6.1.2 ConnectionFactory
不论是服务端还是客户端,其实本质都在做一件事情:创建 ConnectionEventHandler 实例并添加到 Netty 的 pipeline 中。
之后当有 ConnectionEvent 触发时(无论是 Netty 定义的事件被触发,还是 SOFABolt 定义的事件被触发),ConnectionEventHandler 会通过异步线程执行器通知 ConnectionEventListener,ConnectionEventListener 将消息派发给具体的 ConnectionEventProcessor 实现类。
6.1.3 MetaServerConnectionFactory
metaServerConnectionFactory 是用来存储所有 meta Sever Connection,这是Bolt的机制应用,需要维持一个长连接。
MetaServerChangeEvent 内容是:dataCenter,以及其下面的Data Server ip 列表。对应MetaServerConnectionFactory 的 MAP 是:
Map< dataCenter : Map<ip, Connection> >
具体定义如下:
public class MetaServerConnectionFactory {
private final Map<String, Map<String, Connection>> MAP = new ConcurrentHashMap<>();
/**
*
* @param dataCenter
* @param ip
* @param connection
*/
public void register(String dataCenter, String ip, Connection connection) {
Map<String, Connection> connectionMap = MAP.get(dataCenter);
if (connectionMap == null) {
Map<String, Connection> newConnectionMap = new ConcurrentHashMap<>();
connectionMap = MAP.putIfAbsent(dataCenter, newConnectionMap);
if (connectionMap == null) {
connectionMap = newConnectionMap;
}
}
connectionMap.put(ip, connection);
}
}
6.1.4 添加Connection
只是在 MetaServerChangeEventHandler . doHandle 函数中有添加操作,调用了metaServerConnectionFactory.register
。
所以在 doHandle 函数中,遍历Event所有的 meta Server IP,这里每一个ip对应一个 data Center。对于每一个ip做如下操作:
- 重连registerMetaServer。
- connect all meta server,就是把Connection放进MetaServerConnectionFactory;
- register leader meta node,就是重新向 leader meta node 发送一个 DataNode 请求;
- 当收到请求结果时候,根据结果内容,往 EventCenter中插入DataServerChangeEvent,这个以后处理;
- 如果MetaServerConnectionFactory中有在Event中不存在的 meta server ip,就从 MetaServerConnectionFactory 中移除。
- 如果 MetaServerConnectionFactory 中有在Event中不存在的 data server ip,就
removeDataCenter(dataCenter);
其中使用了metaNodeExchanger去连接metaServer。具体代码如下:
private void registerMetaServer(String dataCenter, String ip) {
PeerId leader = metaServerService.getLeader();
for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) {
try {
Channel channel = metaNodeExchanger.connect(new URL(ip, dataServerConfig
.getMetaServerPort()));
//connect all meta server
if (channel != null && channel.isConnected()) {
metaServerConnectionFactory.register(dataCenter, ip,
((BoltChannel) channel).getConnection());
}
//其他操作
}
}
MetaServerConnectionFactory在运行时如下:
metaServerConnectionFactory = {MetaServerConnectionFactory@5387}
MAP = {ConcurrentHashMap@6154} size = 1
"DefaultDataCenter" -> {ConcurrentHashMap@6167} size = 1
0x07 MetaNodeExchanger
dataServer和metaServer之间是推拉模型交互。
MetaNodeExchanger 是 bolt Exchange,把metaServer相关的网络操作集中在一起。无论是MetaServerChangeEventHandler还是DefaultMetaServiceImpl,都基于此与Meta Server交互。其中
-
connect 设置了响应函数metaClientHandlers
-
而 request 时候,如果失败了,则会 metaServerService.refreshLeader().getIp() 刷新地址,重新调用。
这里会测试MetaServer有效性 。
public class MetaNodeExchanger implements NodeExchanger {
@Autowired
private Exchange boltExchange;
@Autowired
private IMetaServerService metaServerService;
@Autowired
private DataServerConfig dataServerConfig;
@Resource(name = "metaClientHandlers")
private Collection<AbstractClientHandler> metaClientHandlers;
@Override
public Response request(Request request) {
Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
try {
final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(),
dataServerConfig.getRpcTimeout());
return () -> result;
} catch (Exception e) {
//retry
URL url = new URL(metaServerService.refreshLeader().getIp(),
dataServerConfig.getMetaServerPort());
final Object result = client.sendSync(url, request.getRequestBody(),
request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout());
return () -> result;
}
}
public Channel connect(URL url) {
Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
if (client == null) {
synchronized (this) {
client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
if (client == null) {
client = boltExchange.connect(Exchange.META_SERVER_TYPE, url,
metaClientHandlers.toArray(new ChannelHandler[metaClientHandlers.size()]));
}
}
}
//try to connect data
Channel channel = client.getChannel(url);
if (channel == null) {
synchronized (this) {
channel = client.getChannel(url);
if (channel == null) {
channel = client.connect(url);
}
}
}
return channel;
}
}
7.1 Client Handler
MetaNodeExchanger响应Handler如下,这部分是推模型,前面已经提到了,serverChangeHandler会响应推送。
@Bean(name = "metaClientHandlers")
public Collection<AbstractClientHandler> metaClientHandlers() {
Collection<AbstractClientHandler> list = new ArrayList<>();
list.add(serverChangeHandler());
list.add(statusConfirmHandler());
list.add(notifyProvideDataChangeHandler());
return list;
}
0x08 核心服务
DefaultMetaServiceImpl是Meta Server相关服务的核心实现。
8.1 DefaultMetaServiceImpl
其中,raftClient是raft的入口,metaNodeExchanger 是bolt的入口。metaServerConnectionFactory 保存目前所有的 meta server bolt connection。
public class DefaultMetaServiceImpl implements IMetaServerService {
@Autowired
private DataServerConfig dataServerConfig;
@Autowired
private MetaNodeExchanger metaNodeExchanger;
@Autowired
private MetaServerConnectionFactory metaServerConnectionFactory;
@Autowired
private DataServerCache dataServerCache;
private RaftClient raftClient;
}
8.2 刷新
刷新是重要功能之一,用来获取raft leader。
@Override
public PeerId getLeader() {
if (raftClient == null) {
startRaftClient();
}
PeerId leader = raftClient.getLeader();
if (leader == null) {
throw new RuntimeException(
"[DefaultMetaServiceImpl] register MetaServer get no leader!");
}
return leader;
}
@Override
public PeerId refreshLeader() {
if (raftClient == null) {
startRaftClient();
}
PeerId leader = raftClient.refreshLeader();
if (leader == null) {
throw new RuntimeException("[RaftClientManager] refresh MetaServer get no leader!");
}
return leader;
}
8.3 重连
另外一个重要功能是重连。
getMetaServerMap完成了重连,getMetaServerMap 的作用:
- 获取 Meta Server 的IP列表,放入set;
- 获取 Meta Server 的 Connection列表,放入connectionMap;
- 如果 connectionMap 是空,则对于 set 中的 ip列表,进行重连;
- 如果 connectionMap 非空,则对于 connectionMap 中的 ip列表,进行重连;
- 拿到上面的 Connection 之后,进行调用
GetNodesRequest(NodeType.META)
; - 根据
GetNodesRequest(NodeType.META)
的结果 NodeChangeResult,构建一个 MetaServerChangeEvent,放入EventCenter。eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
具体代码如下:
@Override
public Map<String, Set<String>> getMetaServerMap() {
HashMap<String, Set<String>> map = new HashMap<>();
Set<String> set = dataServerConfig.getMetaServerIpAddresses();
Map<String, Connection> connectionMap = metaServerConnectionFactory
.getConnections(dataServerConfig.getLocalDataCenter());
Connection connection = null;
try {
if (connectionMap.isEmpty()) {
List<String> list = new ArrayList(set);
Collections.shuffle(list);
connection = ((BoltChannel) metaNodeExchanger.connect(new URL(list.iterator()
.next(), dataServerConfig.getMetaServerPort()))).getConnection();
} else {
List<Connection> connections = new ArrayList<>(connectionMap.values());
Collections.shuffle(connections);
connection = connections.iterator().next();
if (!connection.isFine()) {
connection = ((BoltChannel) metaNodeExchanger.connect(new URL(connection
.getRemoteIP(), dataServerConfig.getMetaServerPort()))).getConnection();
}
}
GetNodesRequest request = new GetNodesRequest(NodeType.META);
final Connection finalConnection = connection;
Object obj = metaNodeExchanger.request(new Request() {
@Override
public Object getRequestBody() {
return request;
}
@Override
public URL getRequestUrl() {
return new URL(finalConnection.getRemoteIP(), finalConnection.getRemotePort());
}
}).getResult();
if (obj instanceof NodeChangeResult) {
NodeChangeResult<MetaNode> result = (NodeChangeResult<MetaNode>) obj;
Map<String, Map<String, MetaNode>> metaNodesMap = result.getNodes();
if (metaNodesMap != null && !metaNodesMap.isEmpty()) {
Map<String, MetaNode> metaNodeMap = metaNodesMap.get(dataServerConfig
.getLocalDataCenter());
if (metaNodeMap != null && !metaNodeMap.isEmpty()) {
map.put(dataServerConfig.getLocalDataCenter(), metaNodeMap.keySet());
}
}
}
}
return map;
}
其中,具体获取MetaServer信息是在
@ConfigurationProperties(prefix = DataServerConfig.PRE_FIX)
public class DataServerConfig {
/**
* Getter method for property <tt>metaServerIpAddress</tt>.
*
* @return property value of metaServerIpAddress
*/
public Set<String> getMetaServerIpAddresses() {
if (metaIps != null && !metaIps.isEmpty()) {
return metaIps;
}
metaIps = new HashSet<>();
if (commonConfig != null) {
Map<String, Collection<String>> metaMap = commonConfig.getMetaNode();
if (metaMap != null && !metaMap.isEmpty()) {
String localDataCenter = commonConfig.getLocalDataCenter();
if (localDataCenter != null && !localDataCenter.isEmpty()) {
Collection<String> metas = metaMap.get(localDataCenter);
if (metas != null && !metas.isEmpty()) {
metaIps = metas.stream().map(NetUtil::getIPAddressFromDomain).collect(Collectors.toSet());
}
}
}
}
return metaIps;
}
}
0x09 后续
在文中我们可以看到,MetaServerChangeEvent也会转化为 DataServerChangeEvent,投放到EventCenter。
如前图的2,4两步。这是因为Meta Server的这个推送,也许是告诉data Server,"hi,目前data server也有变动"。所以下一期我们介绍如何处理DataServerChangeEvent。
前图:
+-------------------------------+
|[DataServerBootstrap] | MetaServerChangeEvent
| |
| +-------------------------+
| startRaftClient | |
| | | +---------------+
| | | | |
+-------------------------------+ | | |
+-------------------------------+ | | |
| [Timer] | | v |
| | | 1 +-------+-----+ |
| ConnectionRefreshMetaTask +------------------------------> | EventCenter +----+ |
| | MetaServerChangeEvent | +-------+-----+ | |
+-------------------------------+ | ^ | |
+-------------------------------+ | | | |
| | | | | |
| [Push<NodeChangeResult>] | | | | |
| | | | | |
| +-------------------------+ | | |
| | MetaServerChangeEvent | | |
| ServerChangeHandler | 2 | | |
| +----------------------------------------+ | |
+-------------------------------+ DataServerChangeEvent | |
| |
| |
MetaServerChangeEvent | |
3 | |
+----------------------------------------------------+ |
| |
v |
+-----------------+--------------+ DataServerChangeEvent |
| | 4 |
| MetaServerChangeEventHandler +------------------------------------------+
| |
+--------------------------------+
手机如下: