zoukankan      html  css  js  c++  java
  • tfs数据复制策略—源码解读

         通常在下面两种情况下,会发生数据复制操作:1 由于节点故障导致该节点上的block数据均丢失 ;2 有新增加的节点时,由于新旧节点上的磁盘利用率不平衡时。

    1 节点故障

         在tfs里面,nameserver会启动心跳线程,定期扫描所有logicblock的状态信息(数据副本版本一致性、副本个数、磁盘空间利用率等)。例如:当一个block的空间利用率小于一个定阈值时,发生数据compact磁盘回收操作;当一个block的副本数小于规定的大小时,会发生数据replication操作。
         由于数据节点不可用导致该节点上的block的副本数都变小,而每个block的副本总数量通常为3,这时会发生数据复制操作,代码逻辑如下: 

        bool BlockManager::need_replicate(ArrayHelper<uint64_t>& servers, PlanPriority& priority, const BlockCollect* block, const time_t now) const
        {
          bool ret = NULL != block;
          if (ret)
          {
            get_mutex_(block->id()).rdlock();
            priority = block->check_replicate(now);
            ret = (priority >= PLAN_PRIORITY_NORMAL);
            if (ret)
              block->get_servers(servers);
            get_mutex_(block->id()).unlock();
            ret = (ret && !manager_.get_task_manager().exist_block(block->id()));
          }
          return ret;
        }    
    
        PlanPriority BlockCollect::check_replicate(const time_t now) const
        {
          PlanPriority priority = PLAN_PRIORITY_NONE;
          if (!is_creating() && !is_in_family() && !in_replicate_queue() && expire(now) && !has_valid_lease(now))
          {
            if (server_size_ <= 0)
            {
              TBSYS_LOG(WARN, "block: %"PRI64_PREFIX"u has been lost, do not replicate", info_.block_id_);
            }
            else
            {
              if (server_size_ < SYSPARAM_NAMESERVER.max_replication_)
                priority = PLAN_PRIORITY_NORMAL;
              if (1 == server_size_ && SYSPARAM_NAMESERVER.max_replication_ > 1)
                priority = PLAN_PRIORITY_EMERGENCY;
            }
          }
          return priority;
        }

    2 节点扩容

         随着数据规模总量的增加,集群扩容也必不可少了。当有新的数据节点加入时,集群自动检测所有数据节点上的block负载和迁移,来达到节点间的均衡。数据迁移的具体问题描述:哪个数据节点源上的哪些block数据,需要迁移到哪个数据节点目的地上?因此需要解决两个问题:

        a. 敲定待迁移数据的源头节点soure和目的节点dest?

           首先,nameserver会获取所有数据节点的负载率,算出集群的数据平均负载率;数据节点负载利用率公式: avg_ratio = (use_capacity)/(total_capacity);

           然后,随机选择低于平均负载率的数据节点作为目的节点;反之,选择高于平均负载率的数据节点作为源头节点。
        b. 敲定源头数据节点上的哪些block_id需要迁移?
           首先,计算源头数据节点上的所有block的活跃值;活跃值公式表示:

    weights = th.last_statistics_time_ * ar.last_access_time_ratio + th.read_visit_count_ * ar.read_ratio + th.write_visit_count_ * ar.write_ratio +th.update_visit_count_ * ar.update_ratio + th.unlink_visit_count_* ar.unlink_ratio;

           然后,选择活跃值最低的block数据作为待迁移的block。

        代码逻辑如下: 

    void MigrateManager::run_()
         {
          int64_t index  = 0;
          const int32_t MAX_SLEEP_TIME = 30;//30s
          const int32_t MAX_ARRAY_SIZE = 128;
          const int32_t CHECK_COMPLETE_WAIT_TIME = 120;//120s
    
          std::pair<uint64_t, int32_t> array[MAX_ARRAY_SIZE];
          common::ArrayHelper<std::pair<uint64_t, int32_t>> helper(MAX_ARRAY_SIZE, array);
          migrateserver::MsRuntimeGlobalInformation& mrgi= migrateserver::MsRuntimeGlobalInformation::instance();
          while (!mrgi.is_destroyed())
          {
            helper.clear();
            blocks_[0].clear();
            blocks_[1].clear();
    
            MigrateEntry entry;
            memset(&entry, 0, sizeof(entry));
            calc_system_disk_migrate_info_(entry);
            if (entry.source_addr_ != INVALID_SERVER_ID
                || entry.dest_addr_ != INVALID_SERVER_ID)
            {
              get_all_servers_(helper);
              for (index = 0; index < helper.get_array_index(); ++index)
              {
                std::pair<uint64_t, int32_t>* item = helper.at(index);
                get_index_header_(item->first, item->second);
              }
              int32_t ret = choose_migrate_entry_(entry);
              if (TFS_SUCCESS == ret)
              {
                ret = do_migrate_(entry);
              }
              if (TFS_SUCCESS == ret)
              {
                Func::sleep(CHECK_COMPLETE_WAIT_TIME, mrgi.is_destroy_);
              }
            }
            Func::sleep(MAX_SLEEP_TIME, mrgi.is_destroy_);
          }
        }
         
        //a. 敲定源和目的数据节点
        void MigrateManager::calc_system_disk_migrate_info_(MigrateEntry& entry) const
        {
          memset(&entry, 0, sizeof(entry));
          int64_t total_capacity = 0, use_capacity = 0;
          statistic_all_server_info_(total_capacity, use_capacity);
          if (total_capacity > 0 && use_capacity > 0)
          {
            double avg_ratio = static_cast<double>(use_capacity)/static_cast<double>(total_capacity);
            tbutil::Mutex::Lock lock(mutex_);
            CONST_SERVER_MAP_ITER iter = servers_.begin();
            for (; iter != servers_.end(); ++iter)
            {
              const common::DataServerStatInfo& info = iter->second;
              if (INVALID_SERVER_ID != info.id_ && common::DATASERVER_DISK_TYPE_SYSTEM == info.type_
                  && info.total_capacity_ > 0)
              {
                double curr_ratio = static_cast<double>(info.use_capacity_) / static_cast<double>(info.total_capacity_);
    
                if (curr_ratio < avg_ratio - balance_percent_)
                {
                  entry.dest_addr_ = info.id_;
                }
                else if ((curr_ratio > (avg_ratio + balance_percent_))
                    || curr_ratio >= 1.0)
                {
                  entry.source_addr_ = info.id_;
                }
              }
            }
          }
        }
        
        //b. 敲定源数据节点上的block_id
        int64_t MigrateManager::calc_block_weight_(const common::IndexHeaderV2& info, const int32_t type) const
        {
          int64_t weights = -1;
          const int64_t now = time(NULL);
          const AccessRatio &ar = DATASERVER_DISK_TYPE_SYSTEM == type ? system_disk_access_ratio_ : full_disk_access_ratio_;
          const ThroughputV2 &th = info.throughput_;
          bool calc = common::DATASERVER_DISK_TYPE_SYSTEM == type ? true :
              (th.last_statistics_time_ + hot_time_range_ < now && is_full(info.info_));
          if (calc)
          {
            weights = th.last_statistics_time_ * ar.last_access_time_ratio +
                th.read_visit_count_ * ar.read_ratio + th.write_visit_count_ * ar.write_ratio +
                th.update_visit_count_ * ar.update_ratio + th.unlink_visit_count_* ar.unlink_ratio;
          }
          return weights;
        }
        
        //c. 发送“迁移任务”给数据节点,开始具体的迁移
        int MigrateManager::do_migrate_(MigrateEntry& current)
        {
          char msg[256] = {''};
          int32_t ret = (current.block_id_ != INVALID_BLOCK_ID
              && current.source_addr_ != INVALID_SERVER_ID
              && current.dest_addr_ != INVALID_SERVER_ID) ? TFS_SUCCESS : EXIT_PARAMETER_ERROR;
          if (TFS_SUCCESS == ret)
          {
            ClientCmdMessage req_msg;
            req_msg.set_value1(current.source_addr_);
            req_msg.set_value2(current.dest_addr_);
            req_msg.set_value3(current.block_id_);
            req_msg.set_value4(REPLICATE_BLOCK_MOVE_FLAG_YES);
            req_msg.set_value5(MOVE_BLOCK_NO_CHECK_RACK_FLAG_YES);
            req_msg.set_cmd(CLIENT_CMD_IMMEDIATELY_REPL);
            int32_t retry_times = 3;
            const int32_t TIMEOUT_MS = 2000;
            do
            {
              NewClient* client = NewClientManager::get_instance().create_client();
              ret = (NULL != client) ? TFS_SUCCESS : EXIT_CLIENT_MANAGER_CREATE_CLIENT_ERROR;
              if (TFS_SUCCESS == ret)
              {
                tbnet::Packet* result = NULL;
                ret = send_msg_to_server(ns_vip_port_, client, &req_msg, result, TIMEOUT_MS);
                if (TFS_SUCCESS == ret)
                {
                  ret = STATUS_MESSAGE == result->getPCode() ? TFS_SUCCESS : EXIT_SEND_MIGRATE_MSG_ERROR;
                }
                if (TFS_SUCCESS == ret)
                {
                  StatusMessage* rsp = dynamic_cast<StatusMessage*>(result);
                  int32_t len = std::min(static_cast<int32_t>(rsp->get_error_msg_length()), 256);
                  len = std::max(0, len);
                  strncpy(msg, rsp->get_error(), len);
                  ret = STATUS_MESSAGE_OK == rsp->get_status() ? TFS_SUCCESS : EXIT_SEND_MIGRATE_MSG_ERROR;
                }
              }
              NewClientManager::get_instance().destroy_client(client);
            }
            while (retry_times-- > 0 && TFS_SUCCESS != ret);
          }
          return ret;
        }
    

      

  • 相关阅读:
    ssh连接虚拟机centos
    centos安装vim
    CentOS 使用yum命令安装出现错误提示”could not retrieve mirrorlist http://mirrorlist.centos.org
    java多线程之yield,join,wait,sleep的区别
    mybatis分页插件pagehelper
    kaptcha验证码插件使用与参数
    redis主从简单配置
    从本地新建项目到提交到github
    Linux服务器安装rocketMQ单机消息队列
    Oracle通过命令导入数据存储文件
  • 原文地址:https://www.cnblogs.com/gisorange/p/4905475.html
Copyright © 2011-2022 走看看