前言
在现有HDFS中,每个副本块默认有3个副本,我们都知道这是为了容错而设计的。为了保持这3个副本的数据一致性,HDFS每次对数据进行写操作的时候,都是以Pipeline的方式进行更新。你可以理解为是一种流水线的方式:R1–>R2–>R3。中间如有一个环节出现问题,那么这次Pipeline更新就得重新来过。在HDFS对象存储服务Ozone中,同样会遇到多副本更新一致性的问题,所以它也需要有自己的Pipeline更新机制。此文,笔者就带领大家学习了解Ozone在这方面的实现。
Ozone的Pipeline概念
Pipeline概念是一个比较抽象的概念,你可以理解它是一个流水线模型。里面包含若干个“数据位置”信息。在HDFS中,这个Pipeline更新对象是block块,而在Ozone是,针对的数据单元是container。然后Ozone中的container再分配block块出去的。
在多数据副本的情况下,定义Pipeline的概念有什么用呢?答案是为了更新的数据一致性。前面也已经提过,Pipeline里面会包含目标位置信息,这个信息就是我们想要拿到的。知道这些信息,具体怎么更新,是另一回事,你可以自己实现一个更新策略,或者用外部开源框架实现。
下面是Ozone中的Pipeline类定义:
/**
* A pipeline represents the group of machines over which a container lives.
*/
public class Pipeline {
// 容器名称
private String containerName;
// 领导者id(外部副本更新机制框架可能会使用到)
private String leaderID;
// 数据副本所在节点
private Map<String, DatanodeID> datanodes;
// pipeline内部允许包含的私有数据
private byte[] data;
...
}
Ozone的Pipeline管理
下面我们来看Pipeline管理,Pipeline是用在做副本更新的,所以这里需要与Ozone现有的副本实现机制挂钩。目前Ozone可提供2种副本机制:
- 1.完全单副本方式,就是standalone模式。
- 2.用外部框架Apache Ratis(分布式一致性算法Raft的Java实现)实现多副本方式。
对应上面2种方式,衍生出2种Pipeline的管理类。下面是Pipeline的基类定义:
/**
* Piepline管理接口
*/
public interface PipelineManager {
/**
* Pipeline获取方法
*
* @param containerName container名称
* @param replicationFactor - 副本数
* @return a Pipeline.
*/
Pipeline getPipeline(String containerName,
OzoneProtos.ReplicationFactor replicationFactor) throws IOException;
/**
* 创建Pipeline
*/
void createPipeline(String pipelineID, List<DatanodeID> datanodes)
throws IOException;;
/**
* 关闭Pipeline
*/
void closePipeline(String pipelineID) throws IOException;
/**
* 列出Pipeline中的成员节点
* @return the datanode
*/
List<DatanodeID> getMembers(String pipelineID) throws IOException;
/**
* 用新节点更新Pipeline
*/
void updatePipeline(String pipelineID, List<DatanodeID> newDatanodes)
throws IOException;
}
针对第一种单副本的方式,作者是用了一个策略类来进行Pipeline节点的选择的。如下代码:
public class StandaloneManagerImpl implements PipelineManager {
private final NodeManager nodeManager;
private final ContainerPlacementPolicy placementPolicy;
private final long containerSize;
...
public Pipeline getPipeline(String containerName, OzoneProtos
.ReplicationFactor replicationFactor) throws IOException {
// 根据策略类进行指定数量节点的选择
List<DatanodeID> datanodes = placementPolicy.chooseDatanodes(
replicationFactor.getNumber(), containerSize);
// 根据选中节点,进行Pipeline对象的构造并返回
return newPipelineFromNodes(datanodes, containerName);
}
}
这里我们重点来看Pipeline如何进行构造的,进入newPipelineFromNodes方法。
private static Pipeline newPipelineFromNodes(final List<DatanodeID> nodes,
final String containerName) {
// 检查候选节点的数量
Preconditions.checkNotNull(nodes);
Preconditions.checkArgument(nodes.size() > 0);
// 选取第一个节点为领导节点
String leaderId = nodes.get(0).getDatanodeUuid();
Pipeline pipeline = new Pipeline(leaderId);
// 进行成员节点的添加
for (DatanodeID node : nodes) {
pipeline.addMember(node);
}
// 再设置container名称
pipeline.setContainerName(containerName);
// 对象构造完毕,对象返回
return pipeline;
}
同样对于Ratis的副本更新方式,也有对应的实现子类:RatisManagerImpl。它内部Pipeline获取方式如下:
public synchronized Pipeline getPipeline(String containerName,
OzoneProtos.ReplicationFactor replicationFactor) {
/**
* 在ratis的方式下,
*
* 1. 如果有空闲的节点,创建一个全新的Pipeline(不会用到策略类)
*
* 2. 如果没有空闲节点,以轮询的方式复用现有的Pipeline.
*/
Pipeline pipeline = null;
List<DatanodeID> newNodes = allocatePipelineNodes(replicationFactor);
if (newNodes != null) {
Preconditions.checkState(newNodes.size() ==
getReplicationCount(replicationFactor), "Replication factor " +
"does not match the expected node count.");
pipeline = allocateRatisPipeline(newNodes, containerName);
} else {
pipeline = findOpenPipeline();
}
if (pipeline == null) {
LOG.error("Get pipeline call failed. We are not able to find free nodes" +
" or operational pipeline.");
}
return pipeline;
}
这里具体细节笔者就不展开了。
在Ozone中,Pipeline是通过调用Pipeline选择器来创建Pipeline。代码如下:
public Pipeline getReplicationPipeline(ReplicationType replicationType,
OzoneProtos.ReplicationFactor replicationFactor, String containerName)
throws IOException {
// 根据副本机制类型,获取Pipeline管理类
PipelineManager manager = getPipelineManager(replicationType);
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Getting replication pipeline for {} : Replication {}",
containerName, replicationFactor.toString());
// 传入副本数进行Pipeline的获取
return manager.getPipeline(containerName, replicationFactor);
}
private PipelineManager getPipelineManager(ReplicationType replicationType)
throws IllegalArgumentException {
switch(replicationType){
case RATIS:
return this.ratisManager;
case STAND_ALONE:
return this.standaloneManager;
case CHAINED:
throw new IllegalArgumentException("Not implemented yet");
default:
throw new IllegalArgumentException("Unexpected enum found. Does not" +
" know how to handle " + replicationType.toString());
}
}
最后在创建container的时候会触发到创建Pipeline的操作,代码如下:
public ContainerInfo allocateContainer(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor replicationFactor,
final String containerName) throws IOException {
...
lock.lock();
try {
byte[] containerBytes =
containerStore.get(containerName.getBytes(encoding));
if (containerBytes != null) {
throw new SCMException("Specified container already exists. key : " +
containerName, SCMException.ResultCodes.CONTAINER_EXISTS);
}
// 进行Pipeline对象的创建
Pipeline pipeline = pipelineSelector.getReplicationPipeline(type,
replicationFactor, containerName);
// pipeline信息设置到Container的信息中
containerInfo = new ContainerInfo.Builder()
.setState(OzoneProtos.LifeCycleState.ALLOCATED)
.setPipeline(pipeline)
.setStateEnterTime(Time.monotonicNow())
.build();
containerStore.put(containerName.getBytes(encoding),
containerInfo.getProtobuf().toByteArray());
} finally {
lock.unlock();
}
return containerInfo;
}
在后续涉及到container的操作,都会用到这个Pipeline的对象信息。从上面还可以看出,Ozone在副本的设计上做的还是很灵活的,可以调整使用的策略,允许外部框架的介入,而不是说完全就自己设计死了一套,这里依靠2个概念的引入:ReplicationType和ReplicationFactor。笔者觉得这块设计确实令人眼前一亮。
Ozone利用Ratis实现副本一致性
最后我们来学习Ozone是如何借助外部框架实现副本数据一致性的,之前提到过Apache Ratis可以帮我们实现副本一致性,那么在Ozone它是如何使用的Ratis的呢?
其实使用的要点很简单:告诉它领导节点和成员节点即可,其它如何实现一致性要求,就是框架内部做的事情了。
而上述提到的领导节点和成员节点,在Pipeline定义是可以得到的。我们看下面的相关代码:
// 根据Pipeline信息对象初始化Raft客户端对象
static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) {
// 传入信息,领导节点,其它成员节点对
return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()),
toRaftPeers(pipeline));
}
// 成员节点信息对
static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
return pipeline.getMachines().stream()
.map(RatisHelper::toRaftPeer)
.collect(Collectors.toList());
}
构造好这个Raft客户端之后,所有针对此Pipeline更新的操作都将由此客户端来调用,并且能保证更新的一致性。