本文为《一个Job在OneFlow中的执行过程》系列文章的下篇,也是最后一篇。在之前的文章一个Job在OneFlow中的执行过程—上篇中,从bottom up的角度,简单讲解了一个Job(用户定义的训练/预测任务)在Oneflow中的调用入口、数据流转过程、从python端到c++端的代码执行流程;在《一个Job在OneFlow中的执行过程—中篇》中重点介绍了编译期过程,详细梳理编译期过程细节、代码执行流程。

本文则重点讲解Runtime运行时过程,主要内容分为两部分:

1.Runtime运行时相关概念2.Runtime各个系统及代码梳理

其中,1.Runtime运行时相关概念主要围绕Plan、Actor、Task、Thread等展开;2.运行时各个系统主要包含:

  • 运行时Actor

  • 内存管理系统Memory Allocator和RegstMgr

  • 线程管理系统Thread Manager

  • 数据/消息通信系统CommNet

  • 集合通信管理系统CollectiveBoxingExecutor

等系统/模块,由于每个部分都很庞大,故本文无法一一覆盖,重点在于通过代码梳理和介绍各个系统的作用及其相互关系,以及从Runtime启动到任务执行、结束、Runtime下线的完整过程。日后会推出系列文章,详细介绍OneFlow系统中各个模块的功能及设计,敬请期待!

需要特别说明的是,此系列文章仅在于描述流程、重点模块的代码,oneflow正处于快速版本更新和迭代过程,所以很多api在未来可能会有较大幅度的变动。目前在推进的工作有interface 1.0工作,包含了对齐pytorch api、multi client设计、构图编译期优化等,敬请期待!


1.运行时相关概念

1.1 Task、Actor和Plan

Plan是运行时计划,同时它也是由Jobs经过编译后得到的物理计算图,其包含Runtime运行时任务执行所需的所有信息。一个Plan拓扑图的示意如下:

Plan中的每个节点是一个用Task描述的任务,在Runtime运行时启动后,会依据每个Task描述的信息实例化其对应的Actor,并由Actor实际负责执行相应的Task。简单来说,Plan是整体的运行时计划,Actor为去中心化任务调度执行的最小控制/执行者,而Task则用来描述一个任务的具体信息。

Task通过oneflow/oneflow/core/job/task.proto[1]中的TaskProto定义了一个任务所需的基本信息:

message TaskProto {// commonrequired TaskType task_type = 1;required int64 machine_id = 2;required int64 thrd_id = 3;required int64 task_id = 4;required int64 job_id = 5;required TaskSetInfo task_set_info = 6;required ExecSequence exec_sequence = 7;map<string, RegstDescProto> produced_regst_desc = 8;map<string, RegstDescIdSet> consumed_regst_desc_id = 9;// compute taskoptional ParallelContext parallel_ctx = 1000; // CompTask
};

其中,根据TaskType中的枚举类型,我们可以定义不同类型的Task,如:

  • 普通前向任务kNormalForward

  • 负责将host memory内存搬运到device上的kCopyHd;

  • 可重入锁相关的kReentrantLock;

  • 各种Tick信号任务相关的kDeviceTick、kSourceTick、kAccTick;

  • 集合通信boxing相关的kCollectiveBoxingGeneric等

enum TaskType {kInvalid = 0;kNormalForward = 1;kDecode = 5;kCopyHd = 12;kCopyCommNet = 13;kPrint = 15;kRecordLoad = 16;kDeviceTick = 27;kDecodeRandom = 29;kPack = 30;kUnpack = 32;kRepeat = 34;kAcc = 37;kSourceTick = 40;kTick = 41;kAccTick = 42;kCase = 43;kEsac = 44;kWaitAndSendIds = 45;kReentrantLock = 46;kCallbackNotify = 47;kForeignInput = 48;kForeignOutput = 49;kDistributeConcat = 55;kDistributeSplit = 56;kSliceBoxing = 57;kCollectiveBoxingGeneric = 58;kBoxingIdentity = 59;kDecodeH2D = 60;kBoxingS2SAll2AllPack = 61;kBoxingS2SAll2AllUnpack = 62;kSspVariableProxy = 63;kBoxingZeros = 64;
};

除了任务类型外,我们看到,还有map<string, RegstDescProto> produced_regst_desc = 8;map<string, RegstDescIdSet> consumed_regst_desc_id = 9;这些变量记录了Task运行期间消费/生产的Regst内存块信息,这些信息记录了任务间的执行顺序、消费关系的拓扑关系。

1.2 Thread和Message

Thread即线程,由全局的ThreadMgr管理,其作用主要负责Actor的生命周期:创建、运行、销毁等。 Thread和Actor是一对多的关系,一个Thread负责管理1~多个Actor,每个Actor都有其对应的唯一Thread进行管理。

Message即消息,因为Actor运行时虽然是去中心化的,不过通常本机Actor之间或和其他机器的Actor间,也需要通过消息传递信息,这时就需要用ActorMsgBus来进行消息的通信和传递。关于消息类型、以及负责消息传递的ActorMsgBus会在下文中详细说明。

之前同事成诚在文章《仅此一文让您掌握OneFlow框架的系统设计(下篇)》[2]中描述的很详细,我这里就直接搬运了:)

消息的类型

在oneflow/core/actor/actor_message.h[3]中,我们可以看到消息类型分为以下几种:

enum class ActorMsgType { kRegstMsg = 0, kEordMsg, kCmdMsg };
  • kRegstMsg: 表示这个ActorMsg包含了一个Regst。这是运行时Actor之间通信最主要的消息,生产者生产一个Regst通知下游消费者的消息、消费者使用完Regst返还给生产者说我用完了,都是RegstMsg。可以从ActorMsg的regst()接口中拿到该Regst。需要注意的是,无论是生产者通知消费者的消息,还是消费者用完的Ack消息,都是同一种消息。OneFlow的Actor通信中是不需要指明“Ack”的。各个Actor在处理ActorMsg的时候都可以从Regst中得知是不是Ack。

  • kCmdMsg: 一些控制指令信号。不包含数据。如kConstructActor(Thread直接处理的消息,用于Thread创建Actor);kStart,Actor启动并开始工作。运行时靠着Start消息的传染,整个计算图开始工作。

  • kEordMsg: 表示任务结束,Actor可以切换到Zombie状态。运行时靠着Eord消息的传染,整个计算图中的Actor均切换到Zombie状态,等待销毁和RunTime下线。运行时的结束不是一下子就结束的,有可能计算图的源节点已经发出了Eord的信号,并将自己切换成Zombie状态,而计算图中的后半部分还在工作中。

消息的路由

ActorMsgBus相当于一个消息的路由,会判断该消息的目的地是否是本机,如果是本机,则通过ThreadMgr找到对应的Thread,然后EnqueueActorMsg。如果消息的目的地是其他机器,则通过Global对象CommNet将该消息发送给其他机器。其他机器的Global对象再收到这个消息以后会通知本机的ActorMsgBus做消息处理。这样就完成了一个消息从消息的生产者Actor到消费者Actor的传递。

Actor间的消息传递

当一个Actor需要给另一个Actor发消息时,会判断接收者Actor:

  • 是否是本线程内:

    • 如果是,则ActorMsgBus会找到本机内的对应线程Thread,传入到该Thread的Msg channel中

    • 否则:调用本机器的CommNet对象传输该消息。 接收者所在机器的CommNet对象收到消息后会转给该机器的ActorMsgBus处理。该机器的ActorMsgBus会找到对应的线程Thread将该消息传入线程的MsgChannel中

    • 如果是,则直接压入Thread的LocalMsgQueue中 (最快)

    • 否则:调用本机器的ActorMsgBus传输数据。 ActorMsgBus会判断接收者是否在本机内

  • Thread会不断轮询自己的LocalMsgQueue,取出对应的消息找到对应的Actor去处理该消息。 如果LocalMsgQueue为空,则尝试去从MsgChannel中取消息放到LocalMsgQueue中。

1.3 Regst和多级内存体系

上面主要介绍了Actor、Thread、Message相关概念,下面说说Regst和OneFlow中的多级内存体系,毕竟Actor执行最多的还是矩阵计算相关的任务,而这些任务往往涉及到内存的使用和释放,这就涉及到RegstMgrRegst。简单来说,Regst是OneFlow运行时的使用内存的最小单元;RegstMgr则是全局的Regst管理者。在Runtime初始化后,会通过NewAllGlobal(plan, total_piece_num, is_experiment_phase);创建系统所需的各种全局所需的Global对象,如ThreadMgr、RegstMgr等。

RegstMgr

RegstMgr在初始化时就会根据Plan申请所有的本机上的内存:主机内存HostMemory、HostPinnedMemory(For CUDA CopyH2D)、设备内存DeviceMemory、锁页内存LockedMemory(For RDMA)等。并根据Plan中的Regst配置信息分配相应的内存地址给Regst。Regst的内存地址是固定的,直到运行时结束Regst的内存地址和大小都不会变化。OneFlow的静态内存管理是Runtime启动时统一分配,Runtime结束时统一销毁。运行时的内存调度开销是0。

Regst

Regst是OneFlow运行时的基本内存单元,也是基本的消息单元,Actor之间的通信、所有的数据生产、消费、回收都是Regst。由于OneFlow是静态内存分配,内存的分时复用调度是编译期的内存复用算法已经做好了(通过控制边+offset方式),所以运行时仅需要按照编译期生成的MemChunk、MemBlock、Regst的配置描述(RegstDescProto[4])信息一次性申请内存,并分配给对应的Regst即可。

Regst存储了两类信息:

  • 生产者Actor id和消费者 Actor ids。一个Regst的生产者是唯一的,消费者可能有多个。

  • Blob的信息

多级内存体系

MemCaseRegst通过MemCase[5]标记了自己所属的内存类型,如果是GPU上的显存,还需要标记自己所属的DeviceId。如果是CPU上的主存,会标记该Regst是否是被CopyHD或CommNet所使用的。Regst通过MemBlockId和MemBlockOffset标记了自己所属于哪个MemBlock以及对应的偏移量。MemoryAllocator根据MemCase和Size申请对应大小和类型的内存块,返回内存块首地址; 根据内存地址回收内存。在Lazy情况下,仅在Runtime的启动/结束时(RegstMgr的构造函数和析构函数里)才会申请/释放内存。MemBlock与Chunk这是OneFlow的多级内存设计:Chunk -> MemBlock -> Regst

  • MemBlock: 同一个Chain(MemChain,通常是GPU上的前后向的所有activation regsts在一个MemChain中,Optimizer子图部分的Regst在各自的MemChain中)内的Regst根据分时复用的原则共用一个MemBlock的不同段,通过size和offset标记。内存复用算法会尽可能让MemBlock的Size小,同时满足互斥的Regst(生命周期有重叠的)不会有内存区域的重叠。

  • Chunk:一个Job内在同一块GPU上的MemBlock的合集称为一个Chunk。Chunk的Size是所有内部MemBlock的Size之和。(即同一个Chunk内部的MemBlock之间没有复用内存)

  • 多个Job在同一个块GPU上的Chunk,会根据Job之间的互斥关系,完整复用一个大的Chunk(取最大值)作为最终的Chunk。如TrainJob和EvalJob互斥,所以TrainJob的所有可复用的Regst的总Chunk跟Eval的总Chunk合并复用一块内存。通常情况下,Eval只有前向,比TrainJob计算图要小,可以完全被TrainJob的Chunk所包含。即新增一个EvalJob不会新增任何内存。

2.Runtime各个系统及代码梳理

启动Runtime

在前面的文章中我们说过,session init后,会通过Oneflow::Init()[6]方法中的CompileAndMergePlanOnMaster()完成整个Job逻辑图、物理图的编译及Plan生成的过程:

Maybe<void> Oneflow::Init(const oneflow::JobSet& job_set) {OF_PROFILER_RANGE_GUARD("Oneflow::Init");// RuntimeOF_PROFILER_RANGE_PUSH("CompileAndMergePlanOnMaster");JUST(CompileAndMergePlanOnMaster(job_set.job(), &plan_));OF_PROFILER_RANGE_POP();  // CompileAndMergePlanOnMasterif (Global<MachineCtx>::Get()->IsThisMachineMaster()) {runtime_buffers_scope_.reset(new RuntimeBuffersScope(plan_));}OF_PROFILER_RANGE_PUSH("new Runtime");runtime_.reset(new Runtime(plan_, GetMaxVal<size_t>(), false));OF_PROFILER_RANGE_POP();  // new Runtimereturn Maybe<void>::Ok();
}

Plan生成后,就会创建新的Runtime运行时。Runtime部分的代码在oneflow/core/job/runtime.cpp#L63[7]

需要注意的是Runtime创建后,各个任务不立刻执行,而是等待tick信号触发后才会执行。

Runtime::Runtime(const Plan& plan, size_t total_piece_num, bool is_experiment_phase) {NewAllGlobal(plan, total_piece_num, is_experiment_phase);std::vector<const TaskProto*> source_tasks;std::vector<const TaskProto*> other_tasks;int64_t this_machine_task_num = 0;for (const TaskProto& task : plan.task()) {if (task.machine_id() != Global<MachineCtx>::Get()->this_machine_id()) { continue; }if (!HasNonCtrlConsumedRegstDescId(task)) {source_tasks.push_back(&task);} else {other_tasks.push_back(&task);}this_machine_task_num += 1;}RuntimeCtx* runtime_ctx = Global<RuntimeCtx>::Get();runtime_ctx->NewCounter("constructing_actor_cnt", this_machine_task_num);HandoutTasks(source_tasks);HandoutTasks(other_tasks);runtime_ctx->WaitUntilCntEqualZero("constructing_actor_cnt");LOG(INFO) << "Actors on this machine constructed";OF_SESSION_BARRIER();LOG(INFO) << "Actors on every machine constructed";if (Global<CommNet>::Get()) { Global<CommNet>::Get()->RegisterMemoryDone(); }OF_SESSION_BARRIER();runtime_ctx->NewCounter("running_actor_cnt", this_machine_task_num);SendCmdMsg(source_tasks, ActorCmd::kStart);
}

Runtime部分的代码主要做了以下几件事情:

  • 创建系统Global对象

  • 遍历Task构建Actor对象

  • 更改Actor工作状态(为待执行)

下面,我们将分别梳理这3个部分所涉及到的系统及代码。

2.1 创建系统Global对象

在Runtime启动后,首先通过NewAllGlobal(plan, total_piece_num, is_experiment_phase);创建了系统所需的各种全局所需的Global对象,如CommNet、MemoryAllocator、RegstMgr、ActorMsgBus、ThreadMgr等。这些Global对象的作用,在文章《仅此一文让您掌握OneFlow框架的系统设计(下篇)》[8]中有介绍:

  • CommNet[9]: CommNet是OneFlow分布式训练中负责多机数据传输和消息通信的模块。底层有基于Epoll的实现和基于RDMA的实现。

  • boxing::collective::CollectiveBoxingExecutor & boxing::collective::CollectiveBoxingDeviceCtxPoller:负责执行集合通信操作(NCCL)

  • MemoryAllocator[10]: 负责内存(Host内存 和 GPU显存)的申请与释放

  • RegstMgr[11]:负责创建所有的Regst (Mgr是Manager的缩写)

  • ActorMsgBus[12]: 负责运行时Actor之间的消息通信 (Msg是Message的缩写)

  • ThreadMgr[13]:负责创建和管理所有的Thread

2.2 遍历Task构建Actor对象

创建完各种运行时所需的Global对象后,会遍历Plan中的所有Task,并构建与其一一对应的Actor。

遍历Tasks

在Runtime中,通过plan.task()方法拿到运行时plan所包含的所有tasks,对其中将在本机执行的tasks分为source tasks和other tasks两类。

判断一个task是否为源task,主要通过HasNonCtrlConsumedRegstDescId()方法,简单来说,就是通过Task的consumed_regst_desc_id属性,若key-value对中不存在“in_ctrl”的key,则表示其为源task。

遍历后的Task分别插入对应的task vector中,这一部分代码如下:

 std::vector<const TaskProto*> source_tasks;std::vector<const TaskProto*> other_tasks;int64_t this_machine_task_num = 0;for (const TaskProto& task : plan.task()) {if (task.machine_id() != Global<MachineCtx>::Get()->this_machine_id()) { continue; }if (!HasNonCtrlConsumedRegstDescId(task)) {source_tasks.push_back(&task);} else {other_tasks.push_back(&task);}this_machine_task_num += 1;}

HandoutTasks

task区分开以后,针对source_tasks和other_tasks分别构造对应的Actor,并通过HandoutTasks方法触发source_tasks和other_tasks初始化相应的Actor:

RuntimeCtx* runtime_ctx = Global<RuntimeCtx>::Get();runtime_ctx->NewCounter("constructing_actor_cnt", this_machine_task_num);HandoutTasks(source_tasks);HandoutTasks(other_tasks);runtime_ctx->WaitUntilCntEqualZero("constructing_actor_cnt");LOG(INFO) << "Actors on this machine constructed";OF_SESSION_BARRIER();LOG(INFO) << "Actors on every machine constructed";

具体的,HandoutTasks主要通过SendCmdMsg()方法来向消息路由器ActorMsgBus发送Actor初始化的指令:kConstructActor:

void SendCmdMsg(const std::vector<const TaskProto*>& tasks, ActorCmd cmd) {for (const TaskProto* task : tasks) {ActorMsg msg = ActorMsg::BuildCommandMsg(task->task_id(), cmd);Global<ActorMsgBus>::Get()->SendMsg(msg);}
}void HandoutTasks(const std::vector<const TaskProto*>& tasks) {for (const TaskProto* task : tasks) {Global<ThreadMgr>::Get()->GetThrd(task->thrd_id())->AddTask(*task);}SendCmdMsg(tasks, ActorCmd::kConstructActor);
}

ActorMsgBus负责将Actor初始化的信息发送到相应机器上(本机/其他机器),最终Thread都会通过轮训本地的消息队列(local_msg_queue_)获取到Actor初始化到信息,并通过ConstructActor()方法,触发对应id的Actor完成初始化Init。

void Thread::PollMsgChannel(const ThreadCtx& thread_ctx) {while (true) {if (local_msg_queue_.empty()) {CHECK_EQ(msg_channel_.ReceiveMany(&local_msg_queue_), kChannelStatusSuccess);}ActorMsg msg = std::move(local_msg_queue_.front());local_msg_queue_.pop();if (msg.msg_type() == ActorMsgType::kCmdMsg) {if (msg.actor_cmd() == ActorCmd::kStopThread) {CHECK(id2actor_ptr_.empty());break;} else if (msg.actor_cmd() == ActorCmd::kConstructActor) {ConstructActor(msg.dst_actor_id(), thread_ctx);continue;} else {// do nothing}}.....}
}

Actor初始化

Actor Init时主要做了以下事情:

  • 根据ThreadCtx创建DeviceCtx。 运行时的Context有三级: ThreadCtx->DeviceCtx->KernelCtx 。

  • 构造Kernel(ConstructKernel)

  • 创建Regst(NewRegsts) 在调用RegstMgr->NewRegsts之前,RegstMgr已经给所有的Regst都申请好了内存,NewRegsts更应该像是GetRegsts。对于同一个RegstDesc,根据其regst_num会有多个Regst实例

  • 处理下游消费的Regst(RegstDescId)以及Regst之间的Inplace

  • 虚接口VirtualActorInit,供各个子类Actor自己重载自定义的初始化内容

初始化相关的代码在:oneflow/oneflow/core/actor/actor.cpp[14],当Actor初始化完毕以后,Actor就进入了等待状态。在Actor收到Eord信号并销毁之前,Actor一直都在等待状态和执行状态之间切换。Actor所有的逻辑都通过ProcessMsg[15]来实现。Thread将收到的消息交给Actor处理,Actor处理消息过程中可能会触发执行(Act),执行会Launch其内部的Kernel。执行结束会向上下游Actor发消息。运行时的去中心化调度就是靠着Actor之间的消息通信所实现的。

Actor内部有多种MsgHandler来处理消息(HandlerNormal和HandlerZombie)。在Actor正常运行过程中都使用HandlerNormal来处理消息。HandlerZombie用于Actor在有序退出时的消息管理。

2.3 更改Actor工作状态

Runtime()的最后一步,当所有的Actor初始化完毕后,通过:SendCmdMsg(source_tasks, ActorCmd::kStart);往消息队列中发送kStart启动信号,触发source task相关的Actor处于启动状态,之后,整个运行时系统便处于待命状态,等待接受到任务启动信号后,才会正式执行。BufferMgr中获取数据,驱动相关actor任务的执行、展开。

2.4 任务执行

经过上面Runtime启动、Actor完成初始化后,整个运行时系统便处于待命状态,当python前端实际发出job执行的指令后,数据便开始从BufferMgr灌入相关的actor,正式开始任务的执行、数据流转和传递,直到整个Plan执行完成。

在文章《一个Job在OneFlow中的执行过程—上篇》[16]中,我们知道通过:session.TryInit().LazyRun(job_func, *args, **kwargs)[17]中的TryInit()完成了Job任务的逻辑图、物理图编译、生成运行时计划Plan,Plan生成后还会进一步完成Actor的初始化以及Runtime()的启动。在这之后,通过LazyRun()实际触发了Runtime的执行。

LazyRun[18]代码如下:

def LazyRun(self, job_func, *arg):print("enter oneflow/python/framework/session_util.py >> LazyRun()")assert self.status_ is SessionStatus.RUNNINGremote_blobs = self.LaunchUserJob(job_func, *arg)if remote_blobs is None:returnfuture_blob = LazyFutureRemoteBlobs(self).SetResult(remote_blobs).Inited()annotation = inspect.signature(job_func).return_annotationreturn oft_util.TransformGlobalFunctionResult(future_blob, annotation)

LazyRun的核心为LaunchUserJob:

def LaunchUserJob(self, job_func, *arg):assert self.status_ is SessionStatus.RUNNINGprint("enter method >>>>>>>>>>>>>>>>>>>>>>>>> LaunchUserJob()")job_name = job_func.__name__push_util.AsyncPush(self, job_func, *arg)print("enter method >>>>>>>>>>>>>>>>>>>>>>>>> LaunchJob():", job_name)self.LaunchJob(job_instance_util.MakeUserJobInstance(job_name))return job_func.__oneflow_output_remote_blobs__

LaunchUserJob顾名思义,即启动用户定义的user job。在实际执行LaunchJob前,会通过push_util.AsyncPush添加并触发执行系统的push job使得数据从BufferMgr灌入相应actor,之后通过self.LaunchJob开启整个任务的实际执行。

我们在如下示例代码中,可以看见LaunchJob的过程依次为push job >> user job(pad_Job) >> pull job:

import oneflow as flow
import oneflow.typing as tp
import numpy as np@flow.global_function()
def pad_Job(x: tp.Numpy.Placeholder((2, 1, 3, 3))) -> tp.Numpy:with flow.scope.placement("cpu", "0:0"):loss = flow.reflection_pad2d(x, padding=1)return lossx = np.arange(18).reshape((2, 1, 3, 3)).astype(np.float32)
print(" >>>>>>>>>>>>>>>>>>>>> pad_Job begin")
y = pad_Job(x)
print(" >>>>>>>>>>>>>>>>>>>>> pad_Job done!")
print("in:\n", x, "y:\n", y)

输出:

>>>>>>>>>>>>>>>>>>>>> pad_Job begin
E0409 21:45:57.386700 1368235 env_global_objects_scope.cpp:96] using rpc backend: local
enter oneflow/python/framework/session_util.py >> LazyRun()
enter method >>>>>>>>>>>>>>>>>>>>>>>>> LaunchUserJob()
enter method >>>>>>>>>>>>>>>>>>>>>>>>> AsyncPush.LaunchJob: System-Push-Input_0
enter method >>>>>>>>>>>>>>>>>>>>>>>>> LaunchJob(): pad_Job
enter oneflow/python/framework/pull_util.py >> SetResult()
enter method >>>>>>>>>>>>>>>>>>>>>>>>> AsyncPull.LaunchJob: System-Pull-Return_2>>>>>>>>>>>>>>>>>>>>> pad_Job done!
in:[[[[ 0.  1.  2.][ 3.  4.  5.][ 6.  7.  8.]]][[[ 9. 10. 11.][12. 13. 14.][15. 16. 17.]]]] y:[[[[ 4.  3.  4.  5.  4.][ 1.  0.  1.  2.  1.][ 4.  3.  4.  5.  4.][ 7.  6.  7.  8.  7.][ 4.  3.  4.  5.  4.]]][[[13. 12. 13. 14. 13.][10.  9. 10. 11. 10.][13. 12. 13. 14. 13.][16. 15. 16. 17. 16.][13. 12. 13. 14. 13.]]]]

2.5 任务结束

当所有的actor依次执行并完成(running_actor_cnt变为0)后,标志Plan中所有任务已完成(结束),同时也触发了Runtime()的析构[19],所有系统全局对象依次销毁,整个Plan的生命周期完结:

Runtime::~Runtime() {Global<RuntimeCtx>::Get()->WaitUntilCntEqualZero("running_actor_cnt");OF_SESSION_BARRIER();DeleteAllGlobal();
}

参考资料

[1]

oneflow/oneflow/core/job/task.proto: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/task.proto

[2]

《仅此一文让您掌握OneFlow框架的系统设计(下篇)》: https://zhuanlan.zhihu.com/p/339208452

[3]

oneflow/core/actor/actor_message.h: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/actor/actor_message.h#L24

[4]

RegstDescProto: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/register/register_desc.proto%23L32

[5]

MemCase: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/memory/memory_case.proto%23L17

[6]

Oneflow::Init(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/oneflow.cpp#L991

[7]

oneflow/core/job/runtime.cpp#L63: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/runtime.cpp#L63

[8]

《仅此一文让您掌握OneFlow框架的系统设计(下篇)》: https://zhuanlan.zhihu.com/p/339208452

[9]

CommNet: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/comm_network/comm_network.h%23L36

[10]

MemoryAllocator: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/memory/memory_allocator.h%23L24

[11]

RegstMgr: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/register/register_manager.h%23L32

[12]

ActorMsgBus: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/actor/actor_message_bus.h%23L24

[13]

ThreadMgr: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/thread/thread_manager.h%23L28

[14]

oneflow/oneflow/core/actor/actor.cpp: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/actor/actor.cpp#L39

[15]

ProcessMsg: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/actor/actor.h#L44

[16]

一个Job在OneFlow中的执行过程—上篇》: https://www.yuque.com/zhaoluyang/ai/ss1gc9#2t74Q

[17]

session.TryInit().LazyRun(job_func, *args, **kwargs): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/function_util.py#L221

[18]

LazyRun: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L277

[19]

Runtime()的析构: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/runtime.cpp#L91

网页端点击阅读原文,获得更好的阅读体验

一个Job在OneFlow中的执行过程—下篇相关推荐

  1. 一个Job在OneFlow中的执行过程—中篇

    上一篇文章<一个Job在OneFlow中的执行过程-上篇>,以bottom up的角度,简单讲解了一个Job(用户定义的训练/预测任务)在Oneflow中的调用入口.数据流转过程.从pyt ...

  2. 通过分析一个C程序的汇编指令执行过程,理解计算机的工作。

    郑德伦 原创作品转载请注明出处 <Linux内核分析>MOOC课程 http://mooc.study.163.com/course/USTC-1000029000 首先创建一个C程序的文 ...

  3. mysql执行一条sql语句的完整过程,sql语句在mysql中的执行过程

    文章目录 1. SQL语句在mysql的执行过程 一:客户端层 二:mysql服务器层 三:innoDB引擎层 2. undo日志.redo日志.binlog日志的区别? 1. SQL语句在mysql ...

  4. java方法在java虚拟机中的执行过程

    概述 在之前"Java运行时内存如何分配?"这篇文章中,曾经提到过Java在执行方法时,借助于Java虚拟机栈来实现方法的调用与执行,但具体是如何执行的呢? 本篇文章就主要来解决这 ...

  5. 从 JVM 角度看 Spring 中方法执行过程和内存状态

    Spring 容器中的 Bean 默认是单例的,也就是说我们在使用被Spring管理的对象的时候,不需要重复创建了,从而节省空间. 举个例子: 作为一个 CRUD 攻城狮,平时开发最多的就是一个Web ...

  6. jos中boot执行过程

    根据代码注释来看: 开启cpu,切换到32位保护模式,跳转到c代码.BIOS会将硬盘上第一个扇区加载到内存中,同时在实模式下cs=0,ip=7c00开始执行 1.16位模式开始执行,关闭中断,将DF置 ...

  7. 计算机指令在CPU中的执行过程(图文版)

    为了了解指令的大概流程,下面以加法指令做以说明(引用<计算机原理>). 指令形式: ADD EA 该指令一个隐含的操作数存在累加器(AC)中,EA为另一个操作数在主存当中的有效地址. 该指 ...

  8. Spring中BeanPostProcessor 执行过程

    1.刷新容器 2.在refresh()方法中 执行 // Instantiate all remaining (non-lazy-init) singletons. // 初始化剩下的非延迟加载(no ...

  9. shell脚本循环执行一个linux命令,Linux中循环执行shell命令的方法

    Linux命令行,循环执行shell命令 死循环 命令格式while true ;do ; done; 可以将 command 替换为任意命令. 下面以echo "hello"; ...

最新文章

  1. oracle缩减临时表空间,oracle的临时表空间写满磁盘空间解决改问题的步骤
  2. MVC设计模式深入理解
  3. NYOJ 56 阶乘因式分解(一)
  4. Struts1 tag
  5. EF架构~codeFirst从初始化到数据库迁移
  6. 第50课 书香阁的座位数
  7. MessageBox.Show常用的2个方法
  8. Restorator 2007 Build 1709 韦斯特*金 汉化版
  9. 金融行业,保险行业软件测试分析
  10. windows创建bat文件进行截图
  11. 何钦铭c语言第三版第3章答案,何钦铭版C语言第3章答案.pdf
  12. 用计算机排序excel,【用excel名字排序的方法有哪些?这些简便的计算机技能你一定需要】- 环球网校...
  13. 山东大学 计算机人工智能2019级 认知科学与类脑计算 期末考试
  14. 面试题-redis数据类型
  15. Zhong__Linux系统安装MongoDB数据库
  16. 本周最新文献速递20211212
  17. streamsets自定义插件部署方案
  18. Python一些可能用的到的函数系列76 最大回撤率
  19. 高效解决PPA软件源卡顿出错的问题(ppa.launchpad.net)
  20. MAC 磁盘清理工具 ncdu

热门文章

  1. java受检异常与运行时异常
  2. 电脑外接显示屏字体和图标过大
  3. WEB安全零基础入门到进阶教程
  4. 13位数字转日期 oracle_Oracle日期格式转换
  5. 服务器中Redis清除缓存
  6. 数据标注丨关于3D点云的这些知识,你知道几个?
  7. 分享6款好用却小众的软件
  8. 用Adobe Photoshop CC(ps)做宝马logo
  9. (专升本)数字多媒体技术基础(音/视频处理软件)
  10. mac简体拼音打出来是英文_Mac如何打出各种标点符号、特殊符号、注音文、全角英文?...