这篇文章是 OneFlow 源码的学习笔记,主要涉及如下几个内容:
- Python 装饰器的使用
- OneFlow 是如何获取前端的计算表达式的
- 编译触发的时机
之前我没有学过、用过 Python 装饰器,在 OneFlow 中看到了之后,就顺便学一下。看源码是学语言的最好方式hhh。
问题
这个问题是在阅读 [1] 的时候产生的,OneFlow 是如何获取前端的计算表达式的,什么时候进行编译?
import oneflow as flow
from oneflow.python.framework.typing import Numpy
import oneflow.typing as tp
import numpy as np
@flow.global_function()
def zero_pad(x: tp.Numpy.Placeholder((2, 1, 3, 3))) -> tp.Numpy:
with flow.scope.placement('cpu', '0:0'):
unused = flow.reflection_pad2d(x, padding=1)
out = flow.zero_pad2d(x, padding=2)
return out
@flow.global_function()
def reflection_pad(x: tp.Numpy.Placeholder((2, 1, 3, 3))) -> tp.Numpy:
with flow.scope.placement('cpu', '0:0'):
loss = flow.reflection_pad2d(x, padding=1)
return loss
x = np.arange(18).reshape((2, 1, 3, 3)).astype(np.float)
y = zero_pad(x)
y1 = zero_pad(x)
y2 = reflection_pad(x)
print('in:
{}
out:
{}
out1:
{}
out2:
{}'.format(x, y, y1, y2))
print('{} {} {} {}'.format(type(x), type(y), type(y1), type(y2)))
OneFlow 可以大致分为三个部分,前端、编译期、运行时。前端部分定义的计算是如何获取到的,以一种什么样的中间格式传给底层,更具体一点 OneFlow 底层如何获取到 reflection_pad2d 这个算子的呢?
Python 装饰器
形如 @flow.global_function()
这样的东西,放在函数定义上面,这东西在 Java 中叫注解,在 Python 中,它叫做装饰器。链接 [2] 是关于装饰器的,扫了一眼,没细看,感觉还行吧。
一个简单的例子
def preprocess(func):
def Decorator(*args):
print('preprocess')
func(*args)
return Decorator
@preprocess
def hello(a, b):
print('{} says hello to {}'.format(a, b))
hello('Jack', 'Rose')
输出:
preprocess
Jack says hello to Rose
分析:被装饰的 hello 函数,被传入到 preprocess 这个函数里面,经过 Decorator 封装后返回,将结果赋值给一个叫做 hello 的变量。
- 如果 preprocess 没有返回值,那么 hello 是一个 None。
- 如果 preprocess 返回的 Decorator 没有调用 hello,那么 print 就不会执行
一个复杂的例子
def process(*names):
def Decorator(fun):
print(*names)
def new_func(*args, **kwds):
print('preprocess')
fun(*args, **kwds)
print('postprocess')
return new_func
return Decorator
@process('good')
def hello(a, b):
print('{} says hello to {}'.format(a, b))
hello('Jack', 'Rose')
输出:
good # 这个只会执行一次,后面的每调用一次 hello,就出现一次。
preprocess
Jack says hello to Rose
postprocess
分析:这个例子和上面那个其实没有本质区别。这次的装饰器可以接收参数,并且返回一个函数闭包,接收的这些参数可以在返回的函数中访问(所谓的闭包)。因为返回了一个函数,所以会将函数 hello 传入到那个函数里面,进行装饰,后面就类似上面简单的例子了。
本质
我认为装饰器的本质就是函数,接受的参数是函数,并且会将函数返回的内容赋值给定义的变量,上面定义的变量就是 hello。如果返回值是函数,那么 hello 就是函数。当然你也可以返回一个常量,后面将 hello 打印出来,会看到 hello 的值。
调用流程分析
跟着上面的代码,单步调试。下面记录几个重要的函数调用和过程。(2021/08/30: 重读下面这段,我觉得写得很不好。代码细节已经忘记了,如果要单步调试,看下面的分析又不是很有意义了。分析源代码,尽量还是要带上源代码吧。在源代码之前,尽可能说清楚不同源代码之间的整体逻辑关系。)
- Python 逐行执行,执行到了 @flow.global_function()
- 跳进 api_oneflow_function,获取一个函数,这个函数是装饰器 (function_util.py:89)
- 上面代码获取到的是 lazy_oneflow_function (function_util.py:151)
- 经过装饰器之后,被装饰的函数最终会执行的方法是 _RunLazyJob (function_util.py:162)
- 被 @flow.global_function() 装饰的函数,第一次调用的时候,执行初始化 TryInit,后续调用不再重复初始化 (session_util.py:167)
- 初始化的时候,进行编译。(session_uti.py:198)
- 在编译之前需要设置好环境,包括设置好 JobBuildAndInferCtx,这里还设置了 job_name,所以后面才可以获取到 (compiler.py:63)
- 接下来执行 _CompileJob,其中会执行用户定义的函数,也就是最前面给的代码中的 zero_pad 和 reflection_pad 两个函数。(compiler.py:113)
- zero_pad 和 reflection_pad 这两个函数内部执行的时候,就是到了具体的算子。每个算子的结尾都会去调用 user_op_builder.py 的方法。比较重要的一个方法是 InferAndTryRun,这个方法将会调用 CurJobAddOp 添加算子到图里(吧)。这也是为什么后面调用 CurJobBuildAndInferCtx_Complete 可以找到这些算子。 (user_op_builder.py:173)
- 之后再次执行这些函数的时候,会跳过 TryInit,执行 LazyRun。(session_util.py:280)
问与答
Q: 编译触发的时机
第一次调用自定义的 Job 函数的时候。最开始的代码里面,第一次调用 zero_pad 的时候,触发编译,启动。
Q: OneFlow 是如何获取前端的计算表达式的
执行用户定义的 Job 函数,里面调用了 user_op_builder.py 的方法 InferAndTryRun,通过 CurJobAddOp 这个函数,将算子的配置,输入输出等信息发送到底层。Op 还有其他方法,比如设置输入 Input 和输出 Output。
Q: 以一种什么样的中间格式传给底层
user_op_builder.py:173,传一个 op_conf 给底层,op_conf 是一个 ProtoBuf message OperatorConf。
OneFlow 中的装饰器
实际上,当调用自己定义的 job 函数的时候,真正执行的是 _RunLazyJob 的内容。
# v0.3.5 function_util.py:148
@enable_if.condition(
hob.in_normal_mode & ~hob.eager_execution_enabled & ~hob.session_initialized
)
def lazy_oneflow_function(function_config=FunctionConfig()):
assert isinstance(function_config, FunctionConfig)
def Decorator(job_func):
if not hasattr(job_func, "__oneflow_function_signature__"):
job_func.__oneflow_function_signature__ = inspect.signature(job_func)
oft_util.CheckGlobalFunctionAnnotation(job_func.__oneflow_function_signature__)
sess = session_ctx.GetDefaultSession()
@functools.wraps(job_func)
def Func(*args, **kwargs):
return _RunLazyJob(sess, job_func, *args, **kwargs)
sess.AddJob(_CloneFunctionDesc(function_config.function_desc, job_func))
for x in dir(job_func):
if x.startswith("__oneflow_"):
setattr(Func, x, getattr(job_func, x))
return Func
return Decorator
参考链接
[1] https://zhuanlan.zhihu.com/p/344531540
[2] https://zhuanlan.zhihu.com/p/78500405