zoukankan      html  css  js  c++  java
  • HDFS Ozone的Pipeline实现机制

    前言


    在现有HDFS中,每个副本块默认有3个副本,我们都知道这是为了容错而设计的。为了保持这3个副本的数据一致性,HDFS每次对数据进行写操作的时候,都是以Pipeline的方式进行更新。你可以理解为是一种流水线的方式:R1–>R2–>R3。中间如有一个环节出现问题,那么这次Pipeline更新就得重新来过。在HDFS对象存储服务Ozone中,同样会遇到多副本更新一致性的问题,所以它也需要有自己的Pipeline更新机制。此文,笔者就带领大家学习了解Ozone在这方面的实现。

    Ozone的Pipeline概念


    Pipeline概念是一个比较抽象的概念,你可以理解它是一个流水线模型。里面包含若干个“数据位置”信息。在HDFS中,这个Pipeline更新对象是block块,而在Ozone是,针对的数据单元是container。然后Ozone中的container再分配block块出去的。

    在多数据副本的情况下,定义Pipeline的概念有什么用呢?答案是为了更新的数据一致性。前面也已经提过,Pipeline里面会包含目标位置信息,这个信息就是我们想要拿到的。知道这些信息,具体怎么更新,是另一回事,你可以自己实现一个更新策略,或者用外部开源框架实现。

    下面是Ozone中的Pipeline类定义:

    /**
     * A pipeline represents the group of machines over which a container lives.
     */
    public class Pipeline {
      // 容器名称
      private String containerName;
      // 领导者id(外部副本更新机制框架可能会使用到)
      private String leaderID;
      // 数据副本所在节点
      private Map<String, DatanodeID> datanodes;
    
      // pipeline内部允许包含的私有数据
      private byte[] data;
      ...
    }

    Ozone的Pipeline管理


    下面我们来看Pipeline管理,Pipeline是用在做副本更新的,所以这里需要与Ozone现有的副本实现机制挂钩。目前Ozone可提供2种副本机制:

    • 1.完全单副本方式,就是standalone模式。
    • 2.用外部框架Apache Ratis(分布式一致性算法Raft的Java实现)实现多副本方式。

    对应上面2种方式,衍生出2种Pipeline的管理类。下面是Pipeline的基类定义:

    /**
     * Piepline管理接口
     */
    public interface PipelineManager {
    
      /**
       * Pipeline获取方法
       *
       * @param containerName container名称
       * @param replicationFactor - 副本数
       * @return a Pipeline.
       */
      Pipeline getPipeline(String containerName,
          OzoneProtos.ReplicationFactor replicationFactor) throws IOException;
    
      /**
       * 创建Pipeline
       */
      void createPipeline(String pipelineID, List<DatanodeID> datanodes)
          throws IOException;;
    
      /**
       * 关闭Pipeline
       */
      void closePipeline(String pipelineID) throws IOException;
    
      /**
       * 列出Pipeline中的成员节点
       * @return the datanode
       */
      List<DatanodeID> getMembers(String pipelineID) throws IOException;
    
      /**
       * 用新节点更新Pipeline
       */
      void updatePipeline(String pipelineID, List<DatanodeID> newDatanodes)
          throws IOException;
    }

    针对第一种单副本的方式,作者是用了一个策略类来进行Pipeline节点的选择的。如下代码:

    public class StandaloneManagerImpl implements PipelineManager {
      private final NodeManager nodeManager;
      private final ContainerPlacementPolicy placementPolicy;
      private final long containerSize;
      ...
    
      public Pipeline getPipeline(String containerName, OzoneProtos
          .ReplicationFactor replicationFactor) throws IOException {
        // 根据策略类进行指定数量节点的选择
        List<DatanodeID> datanodes = placementPolicy.chooseDatanodes(
            replicationFactor.getNumber(), containerSize);
        // 根据选中节点,进行Pipeline对象的构造并返回
        return newPipelineFromNodes(datanodes, containerName);
      }
    }

    这里我们重点来看Pipeline如何进行构造的,进入newPipelineFromNodes方法。

      private static Pipeline newPipelineFromNodes(final List<DatanodeID> nodes,
          final String containerName) {
        // 检查候选节点的数量
        Preconditions.checkNotNull(nodes);
        Preconditions.checkArgument(nodes.size() > 0);
        // 选取第一个节点为领导节点
        String leaderId = nodes.get(0).getDatanodeUuid();
        Pipeline pipeline = new Pipeline(leaderId);
        // 进行成员节点的添加
        for (DatanodeID node : nodes) {
          pipeline.addMember(node);
        }
    
        // 再设置container名称
        pipeline.setContainerName(containerName);
        // 对象构造完毕,对象返回
        return pipeline;
      }

    同样对于Ratis的副本更新方式,也有对应的实现子类:RatisManagerImpl。它内部Pipeline获取方式如下:

    public synchronized Pipeline getPipeline(String containerName,
           OzoneProtos.ReplicationFactor replicationFactor) {
        /**
         * 在ratis的方式下,
         *
         * 1. 如果有空闲的节点,创建一个全新的Pipeline(不会用到策略类)
         *
         * 2. 如果没有空闲节点,以轮询的方式复用现有的Pipeline.
         */
        Pipeline pipeline = null;
        List<DatanodeID> newNodes = allocatePipelineNodes(replicationFactor);
        if (newNodes != null) {
          Preconditions.checkState(newNodes.size() ==
              getReplicationCount(replicationFactor), "Replication factor " +
              "does not match the expected node count.");
          pipeline = allocateRatisPipeline(newNodes, containerName);
        } else {
          pipeline = findOpenPipeline();
        }
        if (pipeline == null) {
          LOG.error("Get pipeline call failed. We are not able to find free nodes" +
              " or operational pipeline.");
        }
        return pipeline;
      }

    这里具体细节笔者就不展开了。

    在Ozone中,Pipeline是通过调用Pipeline选择器来创建Pipeline。代码如下:

      public Pipeline getReplicationPipeline(ReplicationType replicationType,
          OzoneProtos.ReplicationFactor replicationFactor, String containerName)
          throws IOException {
        // 根据副本机制类型,获取Pipeline管理类
        PipelineManager manager = getPipelineManager(replicationType);
        Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
        LOG.debug("Getting replication pipeline for {} : Replication {}",
            containerName, replicationFactor.toString());
        // 传入副本数进行Pipeline的获取
        return manager.getPipeline(containerName, replicationFactor);
      }
    
      private PipelineManager getPipelineManager(ReplicationType replicationType)
          throws IllegalArgumentException {
        switch(replicationType){
        case RATIS:
          return this.ratisManager;
        case STAND_ALONE:
          return this.standaloneManager;
        case CHAINED:
          throw new IllegalArgumentException("Not implemented yet");
        default:
          throw new IllegalArgumentException("Unexpected enum found. Does not" +
              " know how to handle " + replicationType.toString());
        }
    
      }

    最后在创建container的时候会触发到创建Pipeline的操作,代码如下:

      public ContainerInfo allocateContainer(OzoneProtos.ReplicationType type,
          OzoneProtos.ReplicationFactor replicationFactor,
          final String containerName) throws IOException {
        ...
    
        lock.lock();
        try {
          byte[] containerBytes =
              containerStore.get(containerName.getBytes(encoding));
          if (containerBytes != null) {
            throw new SCMException("Specified container already exists. key : " +
                containerName, SCMException.ResultCodes.CONTAINER_EXISTS);
          }
          // 进行Pipeline对象的创建 
          Pipeline pipeline = pipelineSelector.getReplicationPipeline(type,
              replicationFactor, containerName);
          // pipeline信息设置到Container的信息中
          containerInfo = new ContainerInfo.Builder()
              .setState(OzoneProtos.LifeCycleState.ALLOCATED)
              .setPipeline(pipeline)
              .setStateEnterTime(Time.monotonicNow())
              .build();
          containerStore.put(containerName.getBytes(encoding),
              containerInfo.getProtobuf().toByteArray());
        } finally {
          lock.unlock();
        }
        return containerInfo;
      }

    在后续涉及到container的操作,都会用到这个Pipeline的对象信息。从上面还可以看出,Ozone在副本的设计上做的还是很灵活的,可以调整使用的策略,允许外部框架的介入,而不是说完全就自己设计死了一套,这里依靠2个概念的引入:ReplicationType和ReplicationFactor。笔者觉得这块设计确实令人眼前一亮。

    Ozone利用Ratis实现副本一致性


    最后我们来学习Ozone是如何借助外部框架实现副本数据一致性的,之前提到过Apache Ratis可以帮我们实现副本一致性,那么在Ozone它是如何使用的Ratis的呢?

    其实使用的要点很简单:告诉它领导节点和成员节点即可,其它如何实现一致性要求,就是框架内部做的事情了。

    而上述提到的领导节点和成员节点,在Pipeline定义是可以得到的。我们看下面的相关代码:

      // 根据Pipeline信息对象初始化Raft客户端对象
      static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) {
        // 传入信息,领导节点,其它成员节点对
        return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()),
            toRaftPeers(pipeline));
      }
      // 成员节点信息对
      static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
        return pipeline.getMachines().stream()
            .map(RatisHelper::toRaftPeer)
            .collect(Collectors.toList());
      }

    构造好这个Raft客户端之后,所有针对此Pipeline更新的操作都将由此客户端来调用,并且能保证更新的一致性。

  • 相关阅读:
    EasyUI的datagrid在IE下解决缓存的方案
    [置顶] 【Mybatis】---mybatis+mysql+ IntelliJ IDEA框架搭建+实例讲解
    【深入分析JavaWeb】-DNS域名解析
    hibernate对JPA_Annotation的支持实例讲解
    【hibernate进阶】hql简单属性查询
    LeetCode 10. 正则表达式匹配
    机器学习(Machine Learning)- 吴恩达(Andrew Ng) 学习笔记(七)
    机器学习(Machine Learning)- 吴恩达(Andrew Ng) 学习笔记(六)
    机器学习(Machine Learning)- 吴恩达(Andrew Ng) 学习笔记(五)
    机器学习(Machine Learning)- 吴恩达(Andrew Ng) 学习笔记(四)
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183657.html
Copyright © 2011-2022 走看看