zoukankan      html  css  js  c++  java
  • hbase replication原理分析

    本文只是从总体流程来分析replication过程,很多细节没有提及,下一篇文章准备多分析分析细节。
     
    replicationSource启动过程
    org.apache.hadoop.hbase.regionserver.HRegionServer#startServiceThreads ->
    org.apache.hadoop.hbase.replication.regionserver.Replication#startReplicationService ->
     //初始化replicationManager
    org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager#init ->
    //在init阶段for循环把所有的replicationPeers添加到source里,即每个replicationPeer对应一个source,也就是可以添加多个slave cluster,replicationPeers从zookeeper /hbase/replication/peers目录取
    org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager#addSource ->
    //在addSource阶段生成ReplicationSource并启动ReplicationSource,ReplicationSource本身是一个线程
    org.apache.hadoop.hbase.replication.regionserver.ReplicationSource#startup
    //ReplicationSource线程启动,进入while循环工作
     
     
    replicationSource大致工作流程
    1. while(isAlive())进行主体循环
    2. 从WAL文件获取List<WAL.Entry>
    3. 通过调用shipEdits方法发送数据
    4. 调用replicationEndpoint replicate方法发送数据
    5. 最终调用admin.replicateWALEntry通过rpc发送数据
     
    regionserver如何从slave cluster中选取regionserver当做复制节点
    1. replication过程需要连接peer(slave cluster),首先要获取这个peer所有活着的regionservers
    2. 拿到所有regionservers信息之后,开始选择哪些regionservers作为replication的对象
    3. 选哪些regionservers当做sink由peer活着的regionserver个数*ratio(默认值0.1)决定,regionservers先shuffle打乱顺序后再截取
    4. 如果选择的sink(regionserver)个数为0,一直等待peer上线,也就是slave cluster没有启动的情况
    5. 下面源码可以解释如何选择regionserver当做sink
      private void connectToPeers() {
        getRegionServers();
    
        int sleepMultiplier = 1;
    
        // Connect to peer cluster first, unless we have to stop
        while (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
          replicationSinkMgr.chooseSinks();
          if (this.isRunning() && replicationSinkMgr.getSinks().size() == 0) {
            if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
              sleepMultiplier++;     //倍数最多为默认配置的300倍,也就是每次sleep最长间隔是300秒
            }
          }
        }
      }
    
      void chooseSinks() {
        List<ServerName> slaveAddresses = endpoint.getRegionServers();
        Collections.shuffle(slaveAddresses, random);
        int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
        sinks = slaveAddresses.subList(0, numSinks);
        lastUpdateToPeers = System.currentTimeMillis();
        badReportCounts.clear();
      }
    
      /**
       * Do the sleeping logic
       * @param msg Why we sleep
       * @param sleepMultiplier by how many times the default sleeping time is augmented
       * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
       */
      protected boolean sleepForRetries(String msg, int sleepMultiplier) {
        try {
          if (LOG.isTraceEnabled()) {
            LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
          }
          Thread.sleep(this.sleepForRetries * sleepMultiplier);
        } catch (InterruptedException e) {
          LOG.debug("Interrupted while sleeping between retries");
        }
        return sleepMultiplier < maxRetriesMultiplier;
      }
    
    this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
    this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
        总结
    1. 每个slave cluster对应一个replicationSource线程,各个slave复制互不干扰
    2. 每个replicationSource是单线程进行传输数据,改成多线程并发传可能更好
    3. 数据是通过rpc发送过去,调用slave cluster regionserver RSRpcServices的replicateWALEntry方法
  • 相关阅读:
    VSCode 设置 CPP 代码风格
    KiCad EDA 5.1.2 使用圆形板框时出现无法走线的问题
    oracle的sql优化
    mybatis 自动生成xml文件配置
    sql循环遍历
    XML
    oracle的concat的用法
    oracle 按某个字段查询重复数据
    Xshell 4的上传与下载
    Oracle之锁
  • 原文地址:https://www.cnblogs.com/yueweimian/p/6520390.html
Copyright © 2011-2022 走看看