前言
前面在初始化 Session 的时候,通过 CurJobAddOp 将 Op 加入到计算图当中,实际上只是将 Op 加入到 Job 里面,而 Job 只是一个 Protobuf Message 罢了。如果用户定义了多个 Job,那么这些 Job 就会构成一个 JobSet。用户将算子添加完之后,就会调用 Complete 对计算图 (其实就是 Job) 进行优化改写。接下来就是启动 Session,启动 Session 的时候进行了什么重要的事情呢?这篇文章就来分析一下。
- 结论:启动 Session 的时候将逻辑上的 Job 编译为物理上的 Plan,启动 Runtime 去执行 Plan。
流程回顾
在 Session 初始化的时候,我们可以看到先调用了 InitLazyGlobalSession,然后调用 compiler.Compile 将 Op 逐个加入计算图,接下来就是启动 Session,调用 StartLazyGlobalSession。StartLazyGlobalSession 背后做了什么操作呢?
# python/oneflow/compatible/single_client/framework/session_util.py: 183
def Init(self):
assert self.status_ is SessionStatus.OPEN
self.status_ = SessionStatus.RUNNING
if not oneflow._oneflow_internal.IsEnvInited():
flow.env.init()
_TryCompleteConfigProto(self.config_proto)
self.resource_ = self.config_proto.resource
if not oneflow._oneflow_internal.EagerExecutionEnabled():
c_api_util.InitLazyGlobalSession(self.config_proto)
for (job_name, func_desc) in self.job_name2function_desc_.items():
compiler.Compile(self, func_desc, self.config_proto)
self.existed_module_names_ = set()
self.job_name2var_name2var_blob_ = dict()
assert len(self.job_name2function_desc_.items()) > 0
oneflow._oneflow_internal.StartLazyGlobalSession()
self.inter_user_job_info_ = c_api_util.GetInterUserJobInfo()
self.UpdateInfo4InterfaceOp()
if not config_util.api_legacy_model_io_enabled():
check_point_v2.Init()
else:
self.eager_config_proto_ctx_ = oneflow._oneflow_internal.LogicalConfigProtoContext(
str(self.config_proto)
)
return self
StartLazyGlobalSession
回想一下,我们进入这个方法之前的状态,我们有一个 JobBuildAndInferCtxMgr,里面存有 JobSet。用户定义一个 Job,就 JobSet 就多一个 Job。这个 Job 的状态是什么样子的呢?这个 Job 是用户定义的 Job 函数转化过来的,并且经过了 CurJobBuildAndInferCtx_Complete 优化改写了。
StartLazyGlobalSession 启动 Session,背后做了什么呢?带着问题单步调试跟踪进去看一看。
- 在 StartLazyGlobalSession 中获取 JobSet,从 JobBuildAndInferCtxMgr 直接拿到。这个细节很重要,JobSet 是承接上一个部分的线索。其实第二篇文章分析 Python 端构图的时候,没有深入 CurJobAddOp 去,因为里面涉及到了 SBP 的推导等。JobSet 是一个 Protobuf message,它的成员是可重复的 Job。JobSet 有 LazyJobBuildAndInferCtxMgr 进行管理,在打开一个 JobBuildAndInferCtx 的时候,会在 JobSet 中新增一个 Job,然后将 Job 传给 JobBuildAndInferCtx。
- StartLazyGlobalSession 中最重要的操作是创建一个全局的 Oneflow 对象,然后使用 JobSet 去初始化这个 Oneflow 对象。JobSet 会这个过程中编译成 Plan,然后启动 Runtime。
// oneflow/api/python/session/session.h: 88
inline Maybe<void> StartLazyGlobalSession() {
CHECK_NOTNULL_OR_RETURN(Global<SessionGlobalObjectsScope>::Get()) << "session not found";
CHECK_OR_RETURN(GlobalProcessCtx::IsThisProcessMaster());
const JobSet& job_set = Global<LazyJobBuildAndInferCtxMgr>::Get()->job_set();
if (Global<ResourceDesc, ForSession>::Get()->enable_debug_mode()) {
TeePersistentLogStream::Create("job_set.prototxt")->Write(job_set);
}
if (job_set.job().empty()) { return Error::JobSetEmptyError() << "no function defined"; }
CHECK_ISNULL_OR_RETURN(Global<Oneflow>::Get());
Global<CtrlClient>::Get()->PushKV("session_job_set", job_set);
Global<const InterJobReuseMemStrategy>::New(job_set.inter_job_reuse_mem_strategy());
Global<Oneflow>::New();
JUST(Global<Oneflow>::Get()->Init(job_set));
return Maybe<void>::Ok();
}
// oneflow/core/job/job_build_and_infer_ctx_mgr.h: 38
class JobBuildAndInferCtxMgr {
public:
// ...
const JobSet& job_set() const { return job_set_; }
// ...
}
- 在 Oneflow 全局对象在初始化的过程中,调用 CompileJobsAndPushMergedPlan 将 Job 编译为 MergedPlan。如果不是 Master 节点,那么不会进行编译,会调用 PullPlan 从 Master 拉取 Plan。最后使用 Plan 初始化 Runtime。
// oneflow/core/job/oneflow.cpp: 1005
Maybe<void> Oneflow::Init(const oneflow::JobSet& job_set) {
OF_PROFILER_RANGE_GUARD("Oneflow::Init");
// Runtime
OF_PROFILER_RANGE_PUSH("CompileJobsAndPushMergedPlan");
JUST(CompileJobsAndPushMergedPlan(job_set.job()));
OF_PROFILER_RANGE_POP(); // CompileJobsAndPushMergedPlan
double start = GetCurTime();
PullPlan("merged_plan", &plan_);
LOG(INFO) << " PullPlan merged_plan time: " << (GetCurTime() - start) / 1e9 << " seconds.
";
if (GlobalProcessCtx::IsThisProcessMaster()) {
runtime_buffers_scope_.reset(new RuntimeBuffersScope(plan_.job_confs()));
}
OF_PROFILER_RANGE_PUSH("new Runtime");
if (Global<ResourceDesc, ForSession>::Get()->enable_dry_run()) {
LOG(ERROR) << "this is dry run, exiting";
exit(0);
}
HashMap<std::string, Blob*> variable_op_name2eager_blob;
runtime_.reset(new Runtime(plan_, variable_op_name2eager_blob));
OF_PROFILER_RANGE_POP(); // new Runtime
return Maybe<void>::Ok();
}
编译 Job 为 MergedPlan
- CompileJobsAndPushMergedPlan 输入是 PbRfp,它是 Protobuf Repeated Field Ptr 的意思,可以认为这个函数的输入就是一个 Job 数组。这个方法,只由 Master 节点执行,即由 Master 生成 Plan。
// oneflow/core/job/oneflow.cpp: 985
Maybe<void> CompileJobsAndPushMergedPlan(const PbRpf<Job>& job_confs) {
if (GlobalProcessCtx::IsThisProcessMaster()) {
Plan plan;
JUST(CompileJobsAndMergePlans(job_confs, plan));
double start = GetCurTime();
// push op_attribute_info
OpAttributeInfo op_attribute_info;
*op_attribute_info.mutable_job_id2op_attribute_ref_table() =
plan.job_id2op_attribute_ref_table();
Global<CtrlClient>::Get()->PushKV("op_attribute_info", op_attribute_info);
// push plan
PushPlan("merged_plan", std::move(plan));
LOG(INFO) << " PushPlan merged_plan time: " << (GetCurTime() - start) / 1e9 << " seconds.
";
}
OF_SESSION_BARRIER();
return Maybe<void>::Ok();
}
- CompileJobsAndMergePlans 在上面的流程中,仅仅只是不断调用方法,实际上什么都还没干呢。下面这个方法,开始干活了。不过这篇文章暂时不深入细节,重要的是先理清楚流程。
下面的代码有点长,这个方法主要做的事情有:
- 添加 Model IO Job
- 添加 Push Job 和 Pull Job
- CompileCurJobOnMaster 逐个编译 Job,MergeSubPlan 将 Job 合并
- Job 之间的内存复用和内存共享
- FinishGlobalCriticalSectionDesc 划分临界区
- MainJob 的生成、编译、链接
// oneflow/core/job/oneflow.cpp: 912
Maybe<void> CompileJobsAndMergePlans(const PbRpf<Job>& job_confs, Plan& plan) {
std::vector<std::shared_ptr<Job>> jobs(job_confs.size());
FOR_RANGE(int, i, 0, jobs.size()) { jobs.at(i).reset(new Job(job_confs.Get(i))); }
if (jobs.size() > 1) { CheckNonDistributeOptimizerAvailable(jobs); }
HashMap<std::string, ParallelBlobConf> var_op_name2parallel_blob_conf;
FilterOpName2ParallelBlobConf({OperatorConf::kVariableConf}, jobs,
&var_op_name2parallel_blob_conf);
auto AppendJob = [&](Job* job) {
JobDesc job_desc(job->job_conf(), jobs.size());
CHECK(!job_desc.Bool("__is_user_function__"));
jobs.emplace_back(new Job(*job));
};
if (Global<ResourceDesc, ForSession>::Get()->resource().enable_legacy_model_io()) {
if (Global<ResourceDesc, ForSession>::Get()->resource().enable_model_io_v2()) {
MakeModelIoV2Jobs(jobs, var_op_name2parallel_blob_conf, AppendJob);
} else {
MakeModelIoJobs(jobs, var_op_name2parallel_blob_conf, AppendJob);
}
}
std::vector<std::shared_ptr<Job>> function_jobs;
function_jobs.reserve(jobs.size());
FOR_RANGE(int, i, 0, jobs.size()) {
JobDesc job_desc(jobs.at(i)->job_conf(), i);
if (job_desc.Bool("__is_user_function__")) { function_jobs.push_back(jobs.at(i)); }
}
HashMap<std::string, ParallelBlobConf> push_op_name2parallel_blob_conf;
FilterOpName2ParallelBlobConf({OperatorConf::kInputConf}, function_jobs,
&push_op_name2parallel_blob_conf);
HashMap<std::string, ParallelBlobConf> pull_op_name2parallel_blob_conf;
FilterOpName2ParallelBlobConf({OperatorConf::kReturnConf}, function_jobs,
&pull_op_name2parallel_blob_conf);
for (const auto& pair : push_op_name2parallel_blob_conf) {
auto push_job = std::make_shared<Job>();
MakePushJob(std::string("System-Push-") + pair.first, pair.first, pair.second, push_job.get());
jobs.emplace_back(push_job);
}
for (const auto& pair : pull_op_name2parallel_blob_conf) {
auto pull_job = std::make_shared<Job>();
MakePullJob(std::string("System-Pull-") + pair.first, pair.first, pair.second, pull_job.get());
jobs.emplace_back(pull_job);
}
std::vector<Plan> sub_plans(jobs.size());
FOR_RANGE(int64_t, i, 0, jobs.size()) {
AddJobName2JobId(jobs.at(i)->job_conf().job_name(), i);
auto scope = std::make_unique<GlobalJobDescScope>(jobs.at(i)->job_conf(), i);
JUST(CompileCurJobOnMaster(jobs.at(i).get(), &sub_plans.at(i), true));
}
MergeSubPlan(&plan, std::move(sub_plans));
InterJobMemSharingUtil::MergeMemReusedChunkBetweenUserJobs(function_jobs, &plan);
InterJobMemSharingUtil::MergeMemSharedInterfaceMemBlockBetweenJobs(jobs, &plan);
PlanUtil::SetForceInplaceMemBlock(&plan);
FinishGlobalCriticalSectionDesc(plan, jobs.size());
Plan main_plan;
std::vector<std::map<int64_t, std::string>> identity_tick_op_names;
{
Job main_job;
std::vector<ReentrantLockBackEdge> lock_back_edges;
JUST(MakeMainJob(&main_job, &identity_tick_op_names, &lock_back_edges));
AddJobName2JobId(main_job.job_conf().job_name(), jobs.size());
JUST(CompileMainJob(&main_job, lock_back_edges, jobs.size(), &main_plan));
}
LinkMainPlan(&plan, std::move(main_plan), identity_tick_op_names);
PlanUtil::CleanUselessMemBlockAndCheckValid(&plan);
PlanUtil::DumpCtrlRegstInfoToPlan(&plan);
if (Global<ResourceDesc, ForSession>::Get()->enable_debug_mode()) {
TeePersistentLogStream::Create("merged_plan")->Write(plan);
PlanUtil::ToDotFile(plan, "/dot/merged_plan.dot");
}
return Maybe<void>::Ok();
}
启动
编译完成之后,就可以启动 Runtime 了。
启动 Runtime 主要做几件事情:
- 所有需要 Plan 的全局对象,调用 AddPlan 将 Plan 传给他们
- 分解 Plan 的 Task,每个 task 一个 actor,根据 task 上的 job_id 信息,创建 actor 的大小
- 构建 RuntimeCtx,调用 HandoutTasks 分发 task,并且发送 ActorCmd::kConstructActor 启动 Actor。
- 向所有 source_tasks 发送 ActorCmd::kStart 启动 actor。
// oneflow/core/job/runtime.cpp: 60
Runtime::Runtime(const Plan& plan, const HashMap<std::string, Blob*>& variable_op_name2eager_blob) {
{
// NOTE(chengcheng): All runtime Global objects AddPlan
Global<RegstMgr>::Get()->AddPlan(plan, variable_op_name2eager_blob);
Global<ThreadMgr>::Get()->AddPlan(plan);
Global<RuntimeJobDescs>::Get()->AddPlan(plan);
collective_boxing_executor_plan_token_ =
Global<boxing::collective::CollectiveBoxingExecutor>::Get()->AddPlan(plan);
}
std::vector<const TaskProto*> source_tasks;
std::vector<const TaskProto*> other_tasks;
int64_t this_machine_task_num = 0;
for (const TaskProto& task : plan.task()) {
if (task.machine_id() != GlobalProcessCtx::Rank()) { continue; }
if (!HasNonCtrlConsumedRegstDescId(task)) {
source_tasks.push_back(&task);
} else {
other_tasks.push_back(&task);
}
auto it = job_id2actor_size_.find(task.job_id());
if (it == job_id2actor_size_.end()) {
auto emplace_ret_pair = job_id2actor_size_.emplace(task.job_id(), 0);
CHECK(emplace_ret_pair.second);
it = emplace_ret_pair.first;
}
it->second++;
this_machine_task_num++;
}
RuntimeCtx* runtime_ctx = Global<RuntimeCtx>::Get();
runtime_ctx->NewCounter("constructing_actor_cnt", this_machine_task_num);
HandoutTasks(source_tasks);
HandoutTasks(other_tasks);
runtime_ctx->WaitUntilCntEqualZero("constructing_actor_cnt");
LOG(INFO) << "Actors on this machine constructed";
OF_SESSION_BARRIER();
LOG(INFO) << "Actors on every machine constructed";
for (auto pair : job_id2actor_size_) {
runtime_ctx->NewCounter(GetRunningActorCountKeyByJobId(pair.first), pair.second);
}
SendCmdMsg(source_tasks, ActorCmd::kStart);
}
// oneflow/core/job/runtime.cpp: 43
void HandoutTasks(const std::vector<const TaskProto*>& tasks) {
for (const TaskProto* task : tasks) {
Global<ThreadMgr>::Get()->GetThrd(task->thrd_id())->AddTask(*task);
}
SendCmdMsg(tasks, ActorCmd::kConstructActor);
}
总结
总结一下 StartLazyGlobalSession,在进入这个方法之前,已经有 JobSet 了,这个 JobSet 是经过 CurJobBuildAndInferCtx_Complete 优化改写了。接下来进入 StartLazyGlobalSession,它会添加更多的 Job 用于模型 IO,用于推送输入、拉取输出,编译连接成 MergedPlan。有了 MergedPlan 之后,就可以带着这个 Plan 启动运行时,启动 Actor。