zoukankan      html  css  js  c++  java
  • OneFlow: 启动 Session

    前言

    前面在初始化 Session 的时候,通过 CurJobAddOp 将 Op 加入到计算图当中,实际上只是将 Op 加入到 Job 里面,而 Job 只是一个 Protobuf Message 罢了。如果用户定义了多个 Job,那么这些 Job 就会构成一个 JobSet。用户将算子添加完之后,就会调用 Complete 对计算图 (其实就是 Job) 进行优化改写。接下来就是启动 Session,启动 Session 的时候进行了什么重要的事情呢?这篇文章就来分析一下。

    • 结论:启动 Session 的时候将逻辑上的 Job 编译为物理上的 Plan,启动 Runtime 去执行 Plan。

    流程回顾

    在 Session 初始化的时候,我们可以看到先调用了 InitLazyGlobalSession,然后调用 compiler.Compile 将 Op 逐个加入计算图,接下来就是启动 Session,调用 StartLazyGlobalSession。StartLazyGlobalSession 背后做了什么操作呢?

    # python/oneflow/compatible/single_client/framework/session_util.py: 183
    def Init(self):
        assert self.status_ is SessionStatus.OPEN
        self.status_ = SessionStatus.RUNNING
        if not oneflow._oneflow_internal.IsEnvInited():
            flow.env.init()
        _TryCompleteConfigProto(self.config_proto)
        self.resource_ = self.config_proto.resource
        if not oneflow._oneflow_internal.EagerExecutionEnabled():
            c_api_util.InitLazyGlobalSession(self.config_proto)
            for (job_name, func_desc) in self.job_name2function_desc_.items():
                compiler.Compile(self, func_desc, self.config_proto)
                self.existed_module_names_ = set()
            self.job_name2var_name2var_blob_ = dict()
            assert len(self.job_name2function_desc_.items()) > 0
            oneflow._oneflow_internal.StartLazyGlobalSession()
            self.inter_user_job_info_ = c_api_util.GetInterUserJobInfo()
            self.UpdateInfo4InterfaceOp()
            if not config_util.api_legacy_model_io_enabled():
                check_point_v2.Init()
        else:
            self.eager_config_proto_ctx_ = oneflow._oneflow_internal.LogicalConfigProtoContext(
                str(self.config_proto)
            )
        return self
    

    StartLazyGlobalSession

    回想一下,我们进入这个方法之前的状态,我们有一个 JobBuildAndInferCtxMgr,里面存有 JobSet。用户定义一个 Job,就 JobSet 就多一个 Job。这个 Job 的状态是什么样子的呢?这个 Job 是用户定义的 Job 函数转化过来的,并且经过了 CurJobBuildAndInferCtx_Complete 优化改写了。

    StartLazyGlobalSession 启动 Session,背后做了什么呢?带着问题单步调试跟踪进去看一看。

    • 在 StartLazyGlobalSession 中获取 JobSet,从 JobBuildAndInferCtxMgr 直接拿到。这个细节很重要,JobSet 是承接上一个部分的线索。其实第二篇文章分析 Python 端构图的时候,没有深入 CurJobAddOp 去,因为里面涉及到了 SBP 的推导等。JobSet 是一个 Protobuf message,它的成员是可重复的 Job。JobSet 有 LazyJobBuildAndInferCtxMgr 进行管理,在打开一个 JobBuildAndInferCtx 的时候,会在 JobSet 中新增一个 Job,然后将 Job 传给 JobBuildAndInferCtx。
    • StartLazyGlobalSession 中最重要的操作是创建一个全局的 Oneflow 对象,然后使用 JobSet 去初始化这个 Oneflow 对象。JobSet 会这个过程中编译成 Plan,然后启动 Runtime。
    // oneflow/api/python/session/session.h: 88
    inline Maybe<void> StartLazyGlobalSession() {
      CHECK_NOTNULL_OR_RETURN(Global<SessionGlobalObjectsScope>::Get()) << "session not found";
      CHECK_OR_RETURN(GlobalProcessCtx::IsThisProcessMaster());
      const JobSet& job_set = Global<LazyJobBuildAndInferCtxMgr>::Get()->job_set();
      if (Global<ResourceDesc, ForSession>::Get()->enable_debug_mode()) {
        TeePersistentLogStream::Create("job_set.prototxt")->Write(job_set);
      }
      if (job_set.job().empty()) { return Error::JobSetEmptyError() << "no function defined"; }
      CHECK_ISNULL_OR_RETURN(Global<Oneflow>::Get());
      Global<CtrlClient>::Get()->PushKV("session_job_set", job_set);
      Global<const InterJobReuseMemStrategy>::New(job_set.inter_job_reuse_mem_strategy());
      Global<Oneflow>::New();
      JUST(Global<Oneflow>::Get()->Init(job_set));
      return Maybe<void>::Ok();
    }
    
    // oneflow/core/job/job_build_and_infer_ctx_mgr.h: 38
    class JobBuildAndInferCtxMgr {
     public:
      // ...
      const JobSet& job_set() const { return job_set_; }
      // ...
    }
    
    • 在 Oneflow 全局对象在初始化的过程中,调用 CompileJobsAndPushMergedPlan 将 Job 编译为 MergedPlan。如果不是 Master 节点,那么不会进行编译,会调用 PullPlan 从 Master 拉取 Plan。最后使用 Plan 初始化 Runtime。
    // oneflow/core/job/oneflow.cpp: 1005
    Maybe<void> Oneflow::Init(const oneflow::JobSet& job_set) {
      OF_PROFILER_RANGE_GUARD("Oneflow::Init");
      // Runtime
      OF_PROFILER_RANGE_PUSH("CompileJobsAndPushMergedPlan");
      JUST(CompileJobsAndPushMergedPlan(job_set.job()));
      OF_PROFILER_RANGE_POP();  // CompileJobsAndPushMergedPlan
      double start = GetCurTime();
      PullPlan("merged_plan", &plan_);
      LOG(INFO) << " PullPlan merged_plan time: " << (GetCurTime() - start) / 1e9 << " seconds.
    ";
      if (GlobalProcessCtx::IsThisProcessMaster()) {
        runtime_buffers_scope_.reset(new RuntimeBuffersScope(plan_.job_confs()));
      }
      OF_PROFILER_RANGE_PUSH("new Runtime");
      if (Global<ResourceDesc, ForSession>::Get()->enable_dry_run()) {
        LOG(ERROR) << "this is dry run, exiting";
        exit(0);
      }
    
      HashMap<std::string, Blob*> variable_op_name2eager_blob;
      runtime_.reset(new Runtime(plan_, variable_op_name2eager_blob));
      OF_PROFILER_RANGE_POP();  // new Runtime
      return Maybe<void>::Ok();
    }
    

    编译 Job 为 MergedPlan

    • CompileJobsAndPushMergedPlan 输入是 PbRfp,它是 Protobuf Repeated Field Ptr 的意思,可以认为这个函数的输入就是一个 Job 数组。这个方法,只由 Master 节点执行,即由 Master 生成 Plan。
    // oneflow/core/job/oneflow.cpp: 985
    Maybe<void> CompileJobsAndPushMergedPlan(const PbRpf<Job>& job_confs) {
      if (GlobalProcessCtx::IsThisProcessMaster()) {
        Plan plan;
        JUST(CompileJobsAndMergePlans(job_confs, plan));
        double start = GetCurTime();
        // push op_attribute_info
        OpAttributeInfo op_attribute_info;
        *op_attribute_info.mutable_job_id2op_attribute_ref_table() =
            plan.job_id2op_attribute_ref_table();
        Global<CtrlClient>::Get()->PushKV("op_attribute_info", op_attribute_info);
        // push plan
        PushPlan("merged_plan", std::move(plan));
        LOG(INFO) << " PushPlan merged_plan time: " << (GetCurTime() - start) / 1e9 << " seconds.
    ";
      }
      OF_SESSION_BARRIER();
      return Maybe<void>::Ok();
    }
    
    • CompileJobsAndMergePlans 在上面的流程中,仅仅只是不断调用方法,实际上什么都还没干呢。下面这个方法,开始干活了。不过这篇文章暂时不深入细节,重要的是先理清楚流程。

    下面的代码有点长,这个方法主要做的事情有:

    • 添加 Model IO Job
    • 添加 Push Job 和 Pull Job
    • CompileCurJobOnMaster 逐个编译 Job,MergeSubPlan 将 Job 合并
    • Job 之间的内存复用和内存共享
    • FinishGlobalCriticalSectionDesc 划分临界区
    • MainJob 的生成、编译、链接
    // oneflow/core/job/oneflow.cpp: 912
    Maybe<void> CompileJobsAndMergePlans(const PbRpf<Job>& job_confs, Plan& plan) {
      std::vector<std::shared_ptr<Job>> jobs(job_confs.size());
      FOR_RANGE(int, i, 0, jobs.size()) { jobs.at(i).reset(new Job(job_confs.Get(i))); }
      if (jobs.size() > 1) { CheckNonDistributeOptimizerAvailable(jobs); }
      HashMap<std::string, ParallelBlobConf> var_op_name2parallel_blob_conf;
      FilterOpName2ParallelBlobConf({OperatorConf::kVariableConf}, jobs,
                                    &var_op_name2parallel_blob_conf);
      auto AppendJob = [&](Job* job) {
        JobDesc job_desc(job->job_conf(), jobs.size());
        CHECK(!job_desc.Bool("__is_user_function__"));
        jobs.emplace_back(new Job(*job));
      };
    
      if (Global<ResourceDesc, ForSession>::Get()->resource().enable_legacy_model_io()) {
        if (Global<ResourceDesc, ForSession>::Get()->resource().enable_model_io_v2()) {
          MakeModelIoV2Jobs(jobs, var_op_name2parallel_blob_conf, AppendJob);
        } else {
          MakeModelIoJobs(jobs, var_op_name2parallel_blob_conf, AppendJob);
        }
      }
      std::vector<std::shared_ptr<Job>> function_jobs;
      function_jobs.reserve(jobs.size());
      FOR_RANGE(int, i, 0, jobs.size()) {
        JobDesc job_desc(jobs.at(i)->job_conf(), i);
        if (job_desc.Bool("__is_user_function__")) { function_jobs.push_back(jobs.at(i)); }
      }
      HashMap<std::string, ParallelBlobConf> push_op_name2parallel_blob_conf;
      FilterOpName2ParallelBlobConf({OperatorConf::kInputConf}, function_jobs,
                                    &push_op_name2parallel_blob_conf);
      HashMap<std::string, ParallelBlobConf> pull_op_name2parallel_blob_conf;
      FilterOpName2ParallelBlobConf({OperatorConf::kReturnConf}, function_jobs,
                                    &pull_op_name2parallel_blob_conf);
      for (const auto& pair : push_op_name2parallel_blob_conf) {
        auto push_job = std::make_shared<Job>();
        MakePushJob(std::string("System-Push-") + pair.first, pair.first, pair.second, push_job.get());
        jobs.emplace_back(push_job);
      }
      for (const auto& pair : pull_op_name2parallel_blob_conf) {
        auto pull_job = std::make_shared<Job>();
        MakePullJob(std::string("System-Pull-") + pair.first, pair.first, pair.second, pull_job.get());
        jobs.emplace_back(pull_job);
      }
    
      std::vector<Plan> sub_plans(jobs.size());
      FOR_RANGE(int64_t, i, 0, jobs.size()) {
        AddJobName2JobId(jobs.at(i)->job_conf().job_name(), i);
        auto scope = std::make_unique<GlobalJobDescScope>(jobs.at(i)->job_conf(), i);
        JUST(CompileCurJobOnMaster(jobs.at(i).get(), &sub_plans.at(i), true));
      }
      MergeSubPlan(&plan, std::move(sub_plans));
      InterJobMemSharingUtil::MergeMemReusedChunkBetweenUserJobs(function_jobs, &plan);
      InterJobMemSharingUtil::MergeMemSharedInterfaceMemBlockBetweenJobs(jobs, &plan);
      PlanUtil::SetForceInplaceMemBlock(&plan);
      FinishGlobalCriticalSectionDesc(plan, jobs.size());
      Plan main_plan;
      std::vector<std::map<int64_t, std::string>> identity_tick_op_names;
      {
        Job main_job;
        std::vector<ReentrantLockBackEdge> lock_back_edges;
        JUST(MakeMainJob(&main_job, &identity_tick_op_names, &lock_back_edges));
        AddJobName2JobId(main_job.job_conf().job_name(), jobs.size());
        JUST(CompileMainJob(&main_job, lock_back_edges, jobs.size(), &main_plan));
      }
      LinkMainPlan(&plan, std::move(main_plan), identity_tick_op_names);
      PlanUtil::CleanUselessMemBlockAndCheckValid(&plan);
      PlanUtil::DumpCtrlRegstInfoToPlan(&plan);
      if (Global<ResourceDesc, ForSession>::Get()->enable_debug_mode()) {
        TeePersistentLogStream::Create("merged_plan")->Write(plan);
        PlanUtil::ToDotFile(plan, "/dot/merged_plan.dot");
      }
      return Maybe<void>::Ok();
    }
    

    启动

    编译完成之后,就可以启动 Runtime 了。

    启动 Runtime 主要做几件事情:

    • 所有需要 Plan 的全局对象,调用 AddPlan 将 Plan 传给他们
    • 分解 Plan 的 Task,每个 task 一个 actor,根据 task 上的 job_id 信息,创建 actor 的大小
    • 构建 RuntimeCtx,调用 HandoutTasks 分发 task,并且发送 ActorCmd::kConstructActor 启动 Actor。
    • 向所有 source_tasks 发送 ActorCmd::kStart 启动 actor。
    // oneflow/core/job/runtime.cpp: 60
    Runtime::Runtime(const Plan& plan, const HashMap<std::string, Blob*>& variable_op_name2eager_blob) {
      {
        // NOTE(chengcheng): All runtime Global objects AddPlan
        Global<RegstMgr>::Get()->AddPlan(plan, variable_op_name2eager_blob);
        Global<ThreadMgr>::Get()->AddPlan(plan);
        Global<RuntimeJobDescs>::Get()->AddPlan(plan);
        collective_boxing_executor_plan_token_ =
            Global<boxing::collective::CollectiveBoxingExecutor>::Get()->AddPlan(plan);
      }
      std::vector<const TaskProto*> source_tasks;
      std::vector<const TaskProto*> other_tasks;
      int64_t this_machine_task_num = 0;
      for (const TaskProto& task : plan.task()) {
        if (task.machine_id() != GlobalProcessCtx::Rank()) { continue; }
        if (!HasNonCtrlConsumedRegstDescId(task)) {
          source_tasks.push_back(&task);
        } else {
          other_tasks.push_back(&task);
        }
        auto it = job_id2actor_size_.find(task.job_id());
        if (it == job_id2actor_size_.end()) {
          auto emplace_ret_pair = job_id2actor_size_.emplace(task.job_id(), 0);
          CHECK(emplace_ret_pair.second);
          it = emplace_ret_pair.first;
        }
        it->second++;
        this_machine_task_num++;
      }
      RuntimeCtx* runtime_ctx = Global<RuntimeCtx>::Get();
      runtime_ctx->NewCounter("constructing_actor_cnt", this_machine_task_num);
      HandoutTasks(source_tasks);
      HandoutTasks(other_tasks);
      runtime_ctx->WaitUntilCntEqualZero("constructing_actor_cnt");
      LOG(INFO) << "Actors on this machine constructed";
      OF_SESSION_BARRIER();
      LOG(INFO) << "Actors on every machine constructed";
      for (auto pair : job_id2actor_size_) {
        runtime_ctx->NewCounter(GetRunningActorCountKeyByJobId(pair.first), pair.second);
      }
      SendCmdMsg(source_tasks, ActorCmd::kStart);
    }
    
    // oneflow/core/job/runtime.cpp: 43
    void HandoutTasks(const std::vector<const TaskProto*>& tasks) {
      for (const TaskProto* task : tasks) {
        Global<ThreadMgr>::Get()->GetThrd(task->thrd_id())->AddTask(*task);
      }
      SendCmdMsg(tasks, ActorCmd::kConstructActor);
    }
    

    总结

    总结一下 StartLazyGlobalSession,在进入这个方法之前,已经有 JobSet 了,这个 JobSet 是经过 CurJobBuildAndInferCtx_Complete 优化改写了。接下来进入 StartLazyGlobalSession,它会添加更多的 Job 用于模型 IO,用于推送输入、拉取输出,编译连接成 MergedPlan。有了 MergedPlan 之后,就可以带着这个 Plan 启动运行时,启动 Actor。

  • 相关阅读:
    转载(腾讯云社区)——详解django-apscheduler的使用方法
    pipenv——python包管理工具
    xx系统需求进度02
    xx系统需求进度01
    Hbase简介
    第七周总结
    《软件需求十步走》阅读笔记一
    第六周总结
    HDFS
    金字塔表达方法
  • 原文地址:https://www.cnblogs.com/zzk0/p/15217438.html
Copyright © 2011-2022 走看看