[源码解析] PyTorch 流水线并行实现 (1)--基础知识
0x00 摘要
本系列开始介绍PyTorch的流水线并行实现。实质上,PyTorch就是 GPipe 的PyTorch版本。这些开源软件在互相借鉴思路,互相学习,从 PyTorch 的源码注释中,可以见到我们之前介绍的部分框架/库的引用或者论文链接。
流水线并行其他文章链接如下:
[源码解析] 深度学习流水线并行Gpipe(1)---流水线基本实现
[源码解析] 深度学习流水线并行GPipe (2) ----- 梯度累积
[源码解析] 深度学习流水线并行 GPipe(3) ----重计算
[源码解析] 深度学习流水线并行之PipeDream(1)--- Profile阶段
[源码解析] 深度学习流水线并行 PipeDream(2)--- 计算分区
[源码解析] 深度学习流水线并行 PipeDream(3)--- 转换模型
[源码解析] 深度学习流水线并行 PipeDream(4)--- 运行时引擎
[源码解析] 深度学习流水线并行 PipeDream(5)--- 通信模块
[源码解析] 深度学习流水线并行 PipeDream(6)--- 1F1B策略
本文图来自论文和github源码。
0x01 历史
我们首先介绍一下来龙去脉。
1.1 GPipe
从前面系列文章我们知道,GPipe是Google Brain发布的可伸缩流水线并行库,它允许高效地训练大型的消耗内存的模型。其论文是:
`GPipe: Efficient Training of Giant Neural Networks using Pipeline Parallelism
<https://arxiv.org/abs/1811.06965>`
GPipe实质是一个模型并行的库,当模型的大小对于单个GPU来说太大时,训练大型模型可能会导致内存不足。为了训练如此大的模型,GPipe把一个多层网络分割成若干个复合层,然后每个复合层被部署到GPU/TPU之上。但是这若干个复合层只能顺序并行,这就严重影响了训练速度。所以GPipe引入了流水线并行机制(pipeline parallelism
),在不同的GPU设备之间对层进行流水线处理。另外,GPipe 也使用了重新计算这个技巧来降低内存,这样可以训练更大的模型。
Gpipe首先将模型分片到不同的设备上,其中每个设备承载模型的一个分片。碎片可以是单个层或一系列层。然后 Gpipe将一小批数据分割成微批次,并将微批次给承载第一个碎片的设备。每个设备上的层做如下操作:
- 对接受到的微批次进行处理,并将输出发送到后续设备。
- 同时,它已准备好处理来自上一个设备的微批次。
通过以这种方式对输入进行管道连接,Gpipe能够减少设备的空闲时间。可以简化大型模型的训练。
1.2 torchgpipe
因为 GPipe 是基于 TensorFlow 的库(这是Google的产品嘛),所以kakaobrain的一些工程师就用PyTorch 来实现了 GPipe,并且开源出来,这就是 torchgpipe,其地址为:https://github.com/kakaobrain/torchgpipe,用户可以通过 pip install torchgpipe 进行安装使用。
该作者团队还发表了一篇论文,具体如下:https://arxiv.org/pdf/2004.09910.pdf
1.3 fairscale
FairScale是Facebook的一个PyTorch扩展库,用于高性能和大规模培训。该库扩展了PyTorch的基本功能,同时添加了新的SOTA缩放技术。FairScale以可组合模块和简易API的形式来提供了最新的分布式训练技术。这些API可以作为研究人员的基本工具,因为其可以使用有限的资源来训练大模型。
其开源地址为:https://github.com/facebookresearch/fairscale.
从其内部源码结构和文档看,个人认为其是Facebook 内部对于市面上各种最新深度学习库的一个收集/实验/探索的试验田。如果实验成熟,Facebook 就把部分代码合并到 PyTorch 之中。
比如从下面可以看到,Facebook 正在/曾经试验如下:
fairscale.nn.pipe is forked from torchgpipe, Copyright 2019, Kakao Brain, licensed under Apache License.
fairscale.nn.model_parallel is forked from Megatron-LM, Copyright 2020, NVIDIA CORPORATION, licensed under Apache License.
fairscale.optim.adascale is forked from AdaptDL, Copyright 2020, Petuum, Inc., licensed under Apache License.
fairscale.nn.misc.flatten_params_wrapper is forked from PyTorch-Reparam-Module, Copyright 2018, Tongzhou Wang, licensed under MIT License.
1.4 PyTorch
从 2021-03-18,PyTorch 1.8.0 Release Notes 之中可以看到。
- Upstream
fairscale.nn.Pipe
into PyTorch astorch.distributed.pipeline
(#44090)
这里正式引入了 pipeline。
https://github.com/pytorch/pytorch/pull/44090 的内容是:
Stack from ghstack:
- #44090 Pull in fairscale.nn.Pipe into PyTorch.
This is an initial commit pulling in the torchgpipe fork at https://github.com/facebookresearch/fairscale.
The purpose of this commit is to just pull in the code and ensure all tests and builds work fine. We will slowly modify this to match our intended API mentioned in #44827. Follow up PRs would
address further changes needed on top of the initial commit..We're pulling the code into the
torch.distributed._pipeline.sync
package. The package is private on purpose since there is a lot of work (ex: docs, API changes etc.) that needs to go in before we can actually officially support this.
需要注意一点是,torchgpipe 这部分代码被合并到 torch/distributed/pipeline/sync 之下,这说明后续 PyTorch 也许会合并一个 async 实现,没准就是 PipeDream。
1.5 基础版本
因为这部分源码在 PyTorch 之中基本未做改变。所以,我们还是以 torchgpipe 原始代码作为例子来进行说明。
本文只是选取重要功能进行讲解,像 Deferred Batch Normalization
和 Skip Connections
就不会分析,读者如果有兴趣,可以自行研究。
0x02 基础知识
在 torchgpipe 和 fairscale 的源码内部,都涉及和介绍了大量相关知识,了解这些知识有助于我们更好的学习源码,也可以回头对 GPipe 的分析做相应印证。
某些知识可能与之前文章有所重复,但是torchpipe和fairscale给出了自己的见解,我们可以再学习一下。
2.1 流水线并行
GPipe将一个模型拆分为多个分区,并将每个分区放置在不同的设备之上,这样可以增加内容容量。例如,我们可以拆分一个占用40GB CUDA内存的模型分为4个分区,每个分区占用10GB。
这种方法称为“模型并行"。然而,典型的深度学习模型由连续的层组成。换句话说,后面的层在前一层完成之前是不会工作的。如果一个模型是由完全连续的层构成,即使我们将模型扩展到两个或多个层上,同一时间也只能使用一个设备。
GPipe将一个小批量(mini-batch)拆分为多个微批量(micro-batches),以使设备尽可能并行工作,这被称为“流水线并行"。
基本上,流水线并行是一个小型数据并行的栈。当每个分区处理完一个微批次后,它可以将输出抛到下一个分区并立即开始下一个微批次的工作,这样分区就可以重叠。
因为每个分区都必须等待前一个分区输入作为第一个微批次来处理,所以流水线之上仍然有空闲时间,我们称之为 “bubble"。
通过选择较小尺寸的微批次,可以减少“bubble"。但通常,较大的批量可以更有效地利用GPU。因此,如果选择的微批量太小,GPU可能未得到充分利用。另外,更快的分区应该等待相邻的较慢分区,分区之间的不平衡也可能导致GPU利用率不足。因此,总体性能由最慢的分区决定。
2.2 Checkpointing
2.2.1 基本概念
Checkpointing 是一种用于减少训练期间GPU内存使用的技术。这是通过避免在向前传递期间存储中间激活张量来实现的。具体而言,Checkpointing 在正向传播过程中,只会记住分区边界处的张量,所有其他中间张量都不会记住,而是在向后传播过程中跟踪原始输入来重新计算向前传播。因此,隐藏层消耗的内存仅为带有检查点的单个微批次所需要的数量。
Checkpointing 是性能和内存之间的折衷,因为如果完全重计算,则所花费的时间与正向传播所花费的时间相同。但 Checkpointing 减少了存储大型激活张量的需要,从而允许我们增加批量大小,增加模型的净吞吐量。
2.2.2 使用
在 GPipe之中,Checkpointing 应用于每个分区,以最小化模型的总体内存消耗。
Checkpointing 会极大减少内存使用,但总体训练速度会降低25%左右。您可以处理如何在模型上应用检查点。Checkpointing 只有三种选择,不能够指定某些特定点:
-
"always" :在所有微批次上应用检查点。
-
"except_last" : 在最后一个微批次之外应用检查点。
-
"never" :从不应用检查点。
@pytest.mark.parametrize('checkpoint', ['never', 'always', 'except_last'])
通常,在最后一个微批次上的检查点可能没有用处,因为保存的内存将立即重建。这就是为什么我们选择"except_last"作为默认选项。如果您决定根本不使用检查点,那么<torch.nn.DataParallel>
可能比GPipe更有效。
2.2.3 实现概述
Checkpointing 已经作为“torch.utils.checkpoint.checkpoint_wrapper"API的一部分实现,通过该API可以包装前向过程中的不同模块。
Checkpointing 通过重写“torch.autograd.Function"来实现。在处理模块前向传递的“forward"函数中,如果使用“no_grad",我们可以在很长一段时间内(即直到反向传播之前)防止正向图的创建和中间激活张量的物化。相反,在后向传播期间,会再次执行前向传播,然后执行后向传播。
前向传播过程的输入使用上下文对象保存,然后在后向传播过程中访问该上下文对象以检索原始输入。PyTorch还保存了RNG(Random Number Generator)的状态,用于前向传播和后向传播,如 Dropout layers 所需。
以下是几个注意点:
-
内存节省完全取决于检查点所包装的模型和分段。每个backprop由几个迷你前向传播(mini-forward)和backprop过程组成。收益完全取决于每层激活值的内存占用。
-
使用BatchNormalization时,您可能需要冻结统计数据的计算,因为我们运行了两次正向传递。
-
确保输入张量的'requires_grad'字段设置为True。为了触发后向传播功能,输出需要设置此字段。通过在输入张量设置这个字段,我们可以确保将其传播到输出,并触发'backward'函数。
2.3 微批次的数目
微批量大小的选择会影响GPU的利用率。较小的微批量可以减少等待先前微批次输出的延迟,但较大的微批量可以更好地利用GPU。因此,关于微批次数量,存在了一个权衡,即每个微批次的GPU利用率和bubble总面积之间的权衡,用户需要为模型找到最佳的微批次数量。
与大的微批次相比,在处理许多小的微批次时,GPU可能会减慢速度。如果每个CUDA内核太便宜而无法计算,那么GPU将无法得到充分利用,因此太小的微批次将导致利用率不足。另一方面,当每个微批次的尺寸减小时,气泡的面积也相应减少。理想情况下,用户应该选择可以提高GPU利用率的最大数量的微批次。
作为补充说明,批次尺寸越小,性能越差。大量的微批次可能会对使用BatchNorm的模型的最终性能产生负面影响,就像 torch.nn.DataParallel
那样。
2.4 检查重计算
GPipe中的检查点执行两次前向传播。第二个前向传播称为“重新计算"。
诸如<torch.nn.BatchNorm2d>
之类的模块在每次正向传播时,如果更新其批处理统计信息,可能就会导致问题。因此,在重新计算期间,不应再次更新正在运行的估计值。为了避免再次更新运行估计,模块的“forward"方法需要能够检测到这是重新计算。
~torchgpipe.is_recomputing
方法可以检测重新计算,在重运行期间,这个方法会返回True
。
class Counter(nn.Module):
def __init__(self):
super().__init__()
self.counter = 0
def forward(self, input):
if not is_recomputing():
self.counter += 1
return input
另外,如果把~torchgpipe.GPipe
的成员变量 deferred_batch_norm
设置为 True,则可以阻止再次更新运行统计。
0x03 使用
3.1 示例
要使用GPipe训练模块,只需将其用 torchgpipe.GPipe
来包装即可,但是用户的模块必须是<torch.nn.Sequential>
的实例。
GPipe 会将自动将模块分割为多个分区,分区是在单个设备上一起运行的一组连续层,其中:
balance
参数确定每个分区中的层数。
chunks
参数指定微批处理的数量。
下面的示例代码显示了如何将具有四层的模块拆分为两个分区,每个分区有两层。此代码还将一个小批次 mini-batch 拆分为8个微批次(micro-batches)
from torchgpipe import GPipe
model = nn.Sequential(a, b, c, d)
model = GPipe(model, balance=[2, 2], chunks=8)
# 1st partition: nn.Sequential(a, b) on cuda:0
# 2nd partition: nn.Sequential(c, d) on cuda:1
for input in data_loader:
output = model(input)
~torchgpipe.GPipe
使用CUDA进行训练。用户不需要自己将模块移动到GPU,因为~torchgpipe.GPipe
自动把每个分区移动到不同的设备上。默认情况下,可用的GPU从cuda:0
开始,并且按顺序为每个分区选择可用GPU。用户也可以利用device
参数指定使用的GPU。
model = GPipe(model,
balance=[2, 2],
devices=[4, 2], # Specify GPUs.
chunks=8)
3.2 输入输出
与典型module不同,GPipe之中,输入设备与输出设备不同,除非只有一个分区。这是因为第一个分区和最后一个分区被放置在不同的设备上。因此,必须将输入和目标移动到相应的设备。可以通过 torchgpipe.GPipe.devices 的属性来完成,这个属性保存了每个分区的设备列表.
in_device = model.devices[0]
out_device = model.devices[-1]
for input, target in data_loader:
# input on in_device
input = input.to(in_device, non_blocking=True)
# target on out_device
target = target.to(out_device, non_blocking=True)
# output on out_device
output = model(input)
loss = F.cross_entropy(output, target)
loss.backward()
...
3.3 嵌套序列(Nested Sequentials)
当~torchgpipe.GPipe
拆分一个<torch.nn.Sequential>
module时候,它将模块的每个子模块视为单一的、不可分割的层。然而,模型事实上并不一定这样,有些子模块可能是另一个顺序模块,可能需要进一步拆分它们。
GPipe 不会支持这些嵌套的 Sequentials module,所以用户需要把module打平(flatten the module)。还好,这在PyTorch中并不难。以下代码段显示了嵌套顺序模块如何展平:
_3_layers = nn.Sequential(...) # len(_3_layers) == 3
_4_layers = nn.Sequential(...) # len(_4_layers) == 4
model = nn.Sequential(_3_layers, _4_layers) # len(model) == 2
def flatten_sequential(module):
def _flatten(module):
for name, child in module.named_children():
if isinstance(child, nn.Sequential):
for sub_name, sub_child in _flatten(child):
yield (f'{name}_{sub_name}', sub_child)
else:
yield (name, child)
return nn.Sequential(OrderedDict(_flatten(module)))
model = flatten_sequential(model) # len(model) == 7
model = GPipe(model, balance=[2, 3, 2], chunks=4)
3.4 典型模型并行
典型的模型并行(Typical Model Parallelism)是GPipe的一个特例。模型并行性是相当于禁用了微批处理和检查点的GPipe,可以通过chunks=1
和 checkpoint='never'
来做到。
model = GPipe(model, balance=[2, 2], chunks=1, checkpoint='never')
至此,库的历史和基本知识已经介绍完毕,下一篇我们介绍 Auto balance。
0xFF 参考
https://docs.nvidia.com/cuda/cuda-runtime-api/stream-sync-behavior.html#stream-sync-behavior
NVIDIA解决方案架构师深度解析大规模参数语言模型Megatron-BERT
Accelerating Wide & Deep Recommender Inference on GPUs
HugeCTR: High-Performance Click-Through Rate Estimation Training
https://discuss.pytorch.org/t/how-to-prefetch-data-when-processing-with-gpu/548
https://github.com/NVIDIA/apex/
https://github.com/justheuristic/prefetch_generator
https://pytorch.org/tutorials/intermediate/model_parallel_turotial.html
https://pytorch.org/docs/stable/autograd.html
https://pytorch.org/docs/notes/cuda.html
https://zhuanlan.zhihu.com/p/61765561